1 from kafka
import KafkaProducer
2 from kafka
.errors
import KafkaError
8 class KafkaProducer(object):
10 def __init__(self
, topic
, message
):
13 self
._message
= message
15 if "ZOOKEEPER_URI" in os
.environ
:
16 broker
= os
.getenv("ZOOKEEPER_URI")
18 broker
= "SO-ub.lxd:2181"
21 If the zookeeper broker URI is not set in the env, by default,
22 SO-ub.lxd container is taken as the host because an instance of
26 producer
= KafkaProducer(key_serializer
=str.encode
,
27 value_serializer
=lambda v
: json
.dumps(v
).encode('ascii'),
28 bootstrap_servers
=broker
, api_version
=(0,10))
31 def publish(self
, key
, message
, topic
=None):
33 future
= producer
.send('alarms', key
, payload
)
36 log
.exception("Error publishing to {} topic." .format(topic
))
39 record_metadata
= future
.get(timeout
=10)
40 self
._log
.debug("TOPIC:", record_metadata
.topic
)
41 self
._log
.debug("PARTITION:", record_metadata
.partition
)
42 self
._log
.debug("OFFSET:", record_metadata
.offset
)
46 def configure_alarm(self
, key
, message
, topic
):
49 "alarm_configuration":
51 "schema_version": 1.0,
52 "schema_type": "configure_alarm",
53 "alarm_configuration":
55 "metric_name": { "type": "string" },
56 "tenant_uuid": { "type": "string" },
57 "resource_uuid": { "type": "string" },
58 "description": { "type": "string" },
59 "severity": { "type": "string" },
60 "operation": { "type": "string" },
61 "threshold_value": { "type": "integer" },
62 "unit": { "type": "string" },
63 "statistic": { "type": "string" }
65 "required": [ "schema_version",
77 publish(key
, value
=json
.dumps(payload_configure
), topic
='alarms')
79 def notify_alarm(self
, key
, message
, topic
):
84 "schema_version": 1.0,
85 "schema_type": "notify_alarm",
88 "alarm_uuid": { "type": "string" },
89 "resource_uuid": { "type": "string" },
90 "description": { "type": "string" },
91 "tenant_uuid": { "type": "string" },
92 "severity": { "type" : ["integer", "string"] },
93 "status": { "type": "string" },
94 "start_date": { "type": "date-time" },
95 "update_date": { "type": "date-time" },
96 "cancel_date": { "type": "date-time" }
98 "required": [ "schema_version",
109 publish(key
, value
=json
.dumps(payload_notify
), topic
='alarms')
111 def alarm_ack(self
, key
, message
, topic
):
116 "schema_version": 1.0,
117 "schema_type": "alarm_ack",
120 "alarm_uuid": { "type": "string" },
121 "tenant_uuid": { "type": "string" },
122 "resource_uuid": { "type": "string" }
124 "required": [ "schema_version",
132 publish(key
, value
.json
.dumps(payload_ack
), topic
='alarms')
134 def configure_metrics(self
, key
, message
, topic
):
136 payload_configure_metrics
= {
139 "schema_version": 1.0,
140 "schema_type": "configure_metrics",
141 "tenant_uuid": { "type": "string" },
142 "metrics_configuration":
144 "metric_name": { "type": "string" },
145 "metric_unit": { "type": "string" },
146 "resource_uuid": { "type": "string" }
148 "required": [ "schema_version",
156 publish(key
, value
.json
.dumps(payload_configure_metrics
), topic
='metrics')
158 def metric_data_request(self
, key
, message
, topic
):
160 payload_metric_data_request
= {
161 "metric_data_request":
163 "schema_version": 1.0,
164 "schema_type": "metric_data_request",
165 "metric_name": { "type": "string" },
166 "resource_uuid": { "type": "string" },
167 "tenant_uuid": { "type": "string" },
168 "collection_period": { "type": "string" }
170 "required": ["schema_version",
178 publish(key
, value
.json
.dumps(payload_metric_data_request
), topic
='metrics')
180 def metric_data_response(self
, key
, message
, topic
):
182 payload_metric_data_response
= {
183 "metric_data_response":
185 "schema_version": 1.0,
186 "schema_type": "metric_data_response",
187 "metrics_name": { "type": "string" },
188 "resource_uuid": { "type": "string" },
189 "tenant_uuid": { "type": "string" },
192 "time_series": { "type": "array" },
193 "metrics_series": { "type": "array" }
196 "required": [ "schema_version",
205 publish(key
, value
.json
.dumps(payload_metric_data_response
), topic
='metrics')
207 def access_credentials(self
, key
, message
, topic
):
209 payload_access_credentials
= {
210 "access_credentials":
212 "schema_version": 1.0,
213 "schema_type": "vim_access_credentials",
214 "vim_type": { "type": "string" },
215 "required": [ "schema_version",
222 "vim_type": "openstack"
226 "openstack-site": { "type": "string" },
227 "user": { "type": "string" },
228 "password": { "type": "string",
229 "options": { "hidden": true
}},
230 "vim_tenant_name": { "type": "string" }
232 "required": [ "openstack_site",
242 "aws_site": { "type": "string" },
243 "user": { "type": "string" },
244 "password": { "type": "string",
245 "options": { "hidden": true
}},
246 "vim_tenant_name": { "type": "string" }
248 "required": [ "aws_site",
258 "vrops_site": { "type": "string" },
259 "vrops_user": { "type": "string" },
260 "vrops_password": { "type": "string",
261 "options": { "hidden": true
}},
262 "vcloud_site": { "type": "string" },
263 "admin_username": { "type": "string" },
264 "admin_password": { "type": "string",
265 "options": { "hidden": true
}},
266 "nsx_manager": { "type": "string" },
267 "nsx_user": { "type": "string" },
268 "nsx_password": { "type": "string",
269 "options": { "hidden": true
}},
270 "vcenter_ip": { "type": "string" },
271 "vcenter_port": { "type": "string" },
272 "vcenter_user": { "type": "string" },
273 "vcenter_password": { "type": "string",
274 "options": { "hidden": true
}},
275 "vim_tenant_name": { "type": "string" },
276 "org_name": { "type": "string" }
278 "required": [ "vrops_site",
294 publish(key
, value
.json
.dumps(payload_access_credentials
), topic
='access_credentials')