e3dbf0ed7bf5b23a21571a4f4c027796979c481d
[osm/devops.git] / installers / docker / osm_metrics / kafka-exporter / mon_to_kafka_exporter.py
1 from kafka import KafkaConsumer, KafkaProducer
2 from kafka.errors import KafkaError
3 import logging
4 import yaml
5 import json
6 import sys
7 import re
8 import datetime
9 import time
10
11 logging.basicConfig(stream=sys.stdout,
12 format='%(asctime)s %(message)s',
13 datefmt='%m/%d/%Y %I:%M:%S %p',
14 level=logging.INFO)
15 log = logging.getLogger(__name__)
16
17
18 def main():
19 if len(sys.argv) <= 1:
20 print ("Usage: metric-transformer.py kafka_server")
21 exit()
22 kafka_server = sys.argv.pop(1)
23 kafka_host = kafka_server.split(':')[0]
24 kafka_port = kafka_server.split(':')[1]
25 transform_messages(kafka_host=kafka_host,
26 kafka_port=kafka_port)
27
28
29 def transform_messages(kafka_host, kafka_port):
30 bootstrap_servers = '{}:{}'.format(kafka_host, kafka_port)
31 producer = KafkaProducer(bootstrap_servers=bootstrap_servers,
32 key_serializer=str.encode,
33 value_serializer=str.encode)
34 consumer = KafkaConsumer(bootstrap_servers=bootstrap_servers,
35 key_deserializer=str.encode,
36 value_deserializer=str.encode)
37 consumer.subscribe(["metric_response"])
38 for message in consumer:
39 try:
40 if message.topic == "metric_response":
41 if message.key == "read_metric_data_response":
42 values = json.loads(message.value)
43 new_msg = {
44 'name': values['metric_name'],
45 'value': values['metrics_data']['metrics_series'][-1],
46 'labels': {
47 'resource_uuid': values['resource_uuid']
48 }
49 }
50 log.info("Message to kafka exporter: %s", new_msg)
51 future = producer.send(topic='kafka_exporter_topic', key='kafka-exporter-key',
52 value=json.dumps(new_msg))
53 response = future.get()
54 log.info("Response from Kafka: %s", response)
55 except Exception as e:
56 log.exception("Error processing message: ")
57
58
59 if __name__ == '__main__':
60 main()
61