1 from kafka
import KafkaConsumer
, KafkaProducer
2 from kafka
.errors
import KafkaError
11 logging
.basicConfig(stream
=sys
.stdout
,
12 format
='%(asctime)s %(message)s',
13 datefmt
='%m/%d/%Y %I:%M:%S %p',
15 log
= logging
.getLogger(__name__
)
19 if len(sys
.argv
) <= 1:
20 print ("Usage: metric-transformer.py kafka_server")
22 kafka_server
= sys
.argv
.pop(1)
23 kafka_host
= kafka_server
.split(':')[0]
24 kafka_port
= kafka_server
.split(':')[1]
25 transform_messages(kafka_host
=kafka_host
,
26 kafka_port
=kafka_port
)
29 def transform_messages(kafka_host
, kafka_port
):
30 bootstrap_servers
= '{}:{}'.format(kafka_host
, kafka_port
)
31 producer
= KafkaProducer(bootstrap_servers
=bootstrap_servers
,
32 key_serializer
=str.encode
,
33 value_serializer
=str.encode
)
34 consumer
= KafkaConsumer(bootstrap_servers
=bootstrap_servers
,
35 key_deserializer
=str.encode
,
36 value_deserializer
=str.encode
)
37 consumer
.subscribe(["metric_response"])
38 for message
in consumer
:
40 if message
.topic
== "metric_response":
41 if message
.key
== "read_metric_data_response":
42 values
= json
.loads(message
.value
)
44 'name': values
['metric_name'],
45 'value': values
['metrics_data']['metrics_series'][-1],
47 'resource_uuid': values
['resource_uuid']
50 log
.info("Message to kafka exporter: %s", new_msg
)
51 future
= producer
.send(topic
='kafka_exporter_topic', key
='kafka-exporter-key',
52 value
=json
.dumps(new_msg
))
53 response
= future
.get()
54 log
.info("Response from Kafka: %s", response
)
55 except Exception as e
:
56 log
.exception("Error processing message: ")
59 if __name__
== '__main__':