X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;ds=inline;f=osm_mon%2Fexporter%2Fexporter.py;fp=osm_mon%2Fexporter%2Fexporter.py;h=0000000000000000000000000000000000000000;hb=4da146638bc3838270fa41c9f9fb91961f726c97;hp=b7893e0ac9c302479f9a40de940c1f4f1433767b;hpb=2aec92e1eb52d5512c2acae9ce9878f2f3c8f782;p=osm%2FMON.git diff --git a/osm_mon/exporter/exporter.py b/osm_mon/exporter/exporter.py deleted file mode 100644 index b7893e0..0000000 --- a/osm_mon/exporter/exporter.py +++ /dev/null @@ -1,125 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright 2018 Whitestack, LLC -# ************************************************************* - -# This file is part of OSM Monitoring module -# All Rights Reserved to Whitestack, LLC - -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at - -# http://www.apache.org/licenses/LICENSE-2.0 - -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# For those usages not covered by the Apache License, Version 2.0 please -# contact: bdiaz@whitestack.com or glavado@whitestack.com -## -import json -import logging -import random -import threading -import time -import uuid - -from kafka import KafkaProducer, KafkaConsumer -from osm_common import dbmongo -from prometheus_client import start_http_server, Gauge - -from osm_mon.core.settings import Config - -log = logging.getLogger(__name__) - - -class MonExporter: - - def __init__(self): - cfg = Config.instance() - self.kafka_server = cfg.BROKER_URI - self.common_db_host = cfg.MONGO_URI.split(':')[0] - self.common_db_port = cfg.MONGO_URI.split(':')[1] - self.collector_interval = 5 - self.metrics = {} - - def _run_exporter(self): - start_http_server(8000) - - def _run_collector(self): - producer = KafkaProducer(bootstrap_servers=self.kafka_server, - key_serializer=str.encode, - value_serializer=str.encode) - consumer = KafkaConsumer(bootstrap_servers=self.kafka_server, - key_deserializer=bytes.decode, - value_deserializer=bytes.decode, - consumer_timeout_ms=10000, - group_id='mon-collector-' + str(uuid.uuid4())) - consumer.subscribe(['metric_response']) - common_db = dbmongo.DbMongo() - common_db.db_connect({'host': self.common_db_host, 'port': int(self.common_db_port), 'name': 'osm'}) - - while True: - try: - time.sleep(self.collector_interval) - vnfrs = common_db.get_list('vnfrs') - for vnfr in vnfrs: - vnfd = common_db.get_one('vnfds', {"_id": vnfr['vnfd-id']}) - for vdur in vnfr['vdur']: - vdu = next( - filter(lambda vdu: vdu['id'] == vdur['vdu-id-ref'], vnfd['vdu']) - ) - if 'monitoring-param' in vdu: - for param in vdu['monitoring-param']: - metric_name = param['nfvi-metric'] - nsr_id = vnfr['nsr-id-ref'] - vnf_member_index = vnfr['member-vnf-index-ref'] - vdu_name = vdur['name'] - cor_id = random.randint(1, 10e7) - payload = { - 'correlation_id': cor_id, - 'metric_name': metric_name, - 'ns_id': nsr_id, - 'vnf_member_index': vnf_member_index, - 'vdu_name': vdu_name, - 'collection_period': 1, - 'collection_unit': 'DAY', - } - producer.send(topic='metric_request', key='read_metric_data_request', - value=json.dumps(payload)) - producer.flush() - for message in consumer: - if message.key == 'read_metric_data_response': - content = json.loads(message.value) - if content['correlation_id'] == cor_id and len( - content['metrics_data']['metrics_series']): - metric_reading = content['metrics_data']['metrics_series'][-1] - if metric_name not in self.metrics.keys(): - self.metrics[metric_name] = Gauge(metric_name, - 'Metric generated by MON collector', - ['ns_id', - 'vnf_member_index', - 'vdu_name']) - self.metrics[metric_name].labels( - ns_id=nsr_id, - vnf_member_index=vnf_member_index, - vdu_name=vdu_name - ).set(metric_reading) - break - - - except Exception: - log.exception("Error collecting metrics: ") - - def run(self): - t1 = threading.Thread(target=self._run_exporter) - t1.start() - t2 = threading.Thread(target=self._run_collector) - t2.start() - - -if __name__ == '__main__': - MonExporter().run()