X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;ds=inline;f=installers%2Fdocker%2Fosm_metrics%2Fkafka-exporter%2Fmon_to_kafka_exporter.py;fp=installers%2Fdocker%2Fosm_metrics%2Fkafka-exporter%2Fmon_to_kafka_exporter.py;h=0000000000000000000000000000000000000000;hb=6021624aec365ab77c770b5880f40b4f711624e2;hp=e3dbf0ed7bf5b23a21571a4f4c027796979c481d;hpb=023a6c145bed7e7bcb215c061c59efb8b287a50f;p=osm%2Fdevops.git diff --git a/installers/docker/osm_metrics/kafka-exporter/mon_to_kafka_exporter.py b/installers/docker/osm_metrics/kafka-exporter/mon_to_kafka_exporter.py deleted file mode 100644 index e3dbf0ed..00000000 --- a/installers/docker/osm_metrics/kafka-exporter/mon_to_kafka_exporter.py +++ /dev/null @@ -1,61 +0,0 @@ -from kafka import KafkaConsumer, KafkaProducer -from kafka.errors import KafkaError -import logging -import yaml -import json -import sys -import re -import datetime -import time - -logging.basicConfig(stream=sys.stdout, - format='%(asctime)s %(message)s', - datefmt='%m/%d/%Y %I:%M:%S %p', - level=logging.INFO) -log = logging.getLogger(__name__) - - -def main(): - if len(sys.argv) <= 1: - print ("Usage: metric-transformer.py kafka_server") - exit() - kafka_server = sys.argv.pop(1) - kafka_host = kafka_server.split(':')[0] - kafka_port = kafka_server.split(':')[1] - transform_messages(kafka_host=kafka_host, - kafka_port=kafka_port) - - -def transform_messages(kafka_host, kafka_port): - bootstrap_servers = '{}:{}'.format(kafka_host, kafka_port) - producer = KafkaProducer(bootstrap_servers=bootstrap_servers, - key_serializer=str.encode, - value_serializer=str.encode) - consumer = KafkaConsumer(bootstrap_servers=bootstrap_servers, - key_deserializer=str.encode, - value_deserializer=str.encode) - consumer.subscribe(["metric_response"]) - for message in consumer: - try: - if message.topic == "metric_response": - if message.key == "read_metric_data_response": - values = json.loads(message.value) - new_msg = { - 'name': values['metric_name'], - 'value': values['metrics_data']['metrics_series'][-1], - 'labels': { - 'resource_uuid': values['resource_uuid'] - } - } - log.info("Message to kafka exporter: %s", new_msg) - future = producer.send(topic='kafka_exporter_topic', key='kafka-exporter-key', - value=json.dumps(new_msg)) - response = future.get() - log.info("Response from Kafka: %s", response) - except Exception as e: - log.exception("Error processing message: ") - - -if __name__ == '__main__': - main() -