+++ /dev/null
-from kafka import KafkaConsumer, KafkaProducer
-from kafka.errors import KafkaError
-import logging
-import yaml
-import json
-import sys
-import re
-import datetime
-import time
-
-logging.basicConfig(stream=sys.stdout,
- format='%(asctime)s %(message)s',
- datefmt='%m/%d/%Y %I:%M:%S %p',
- level=logging.INFO)
-log = logging.getLogger(__name__)
-
-
-def main():
- if len(sys.argv) <= 1:
- print ("Usage: metric-transformer.py kafka_server")
- exit()
- kafka_server = sys.argv.pop(1)
- kafka_host = kafka_server.split(':')[0]
- kafka_port = kafka_server.split(':')[1]
- transform_messages(kafka_host=kafka_host,
- kafka_port=kafka_port)
-
-
-def transform_messages(kafka_host, kafka_port):
- bootstrap_servers = '{}:{}'.format(kafka_host, kafka_port)
- producer = KafkaProducer(bootstrap_servers=bootstrap_servers,
- key_serializer=str.encode,
- value_serializer=str.encode)
- consumer = KafkaConsumer(bootstrap_servers=bootstrap_servers,
- key_deserializer=str.encode,
- value_deserializer=str.encode)
- consumer.subscribe(["metric_response"])
- for message in consumer:
- try:
- if message.topic == "metric_response":
- if message.key == "read_metric_data_response":
- values = json.loads(message.value)
- new_msg = {
- 'name': values['metric_name'],
- 'value': values['metrics_data']['metrics_series'][-1],
- 'labels': {
- 'resource_uuid': values['resource_uuid']
- }
- }
- log.info("Message to kafka exporter: %s", new_msg)
- future = producer.send(topic='kafka_exporter_topic', key='kafka-exporter-key',
- value=json.dumps(new_msg))
- response = future.get()
- log.info("Response from Kafka: %s", response)
- except Exception as e:
- log.exception("Error processing message: ")
-
-
-if __name__ == '__main__':
- main()
-