blob: b7893e0ac9c302479f9a40de940c1f4f1433767b [file] [log] [blame]
# -*- coding: utf-8 -*-
# Copyright 2018 Whitestack, LLC
# *************************************************************
# This file is part of OSM Monitoring module
# All Rights Reserved to Whitestack, LLC
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# For those usages not covered by the Apache License, Version 2.0 please
# contact: bdiaz@whitestack.com or glavado@whitestack.com
##
import json
import logging
import random
import threading
import time
import uuid
from kafka import KafkaProducer, KafkaConsumer
from osm_common import dbmongo
from prometheus_client import start_http_server, Gauge
from osm_mon.core.settings import Config
log = logging.getLogger(__name__)
class MonExporter:
def __init__(self):
cfg = Config.instance()
self.kafka_server = cfg.BROKER_URI
self.common_db_host = cfg.MONGO_URI.split(':')[0]
self.common_db_port = cfg.MONGO_URI.split(':')[1]
self.collector_interval = 5
self.metrics = {}
def _run_exporter(self):
start_http_server(8000)
def _run_collector(self):
producer = KafkaProducer(bootstrap_servers=self.kafka_server,
key_serializer=str.encode,
value_serializer=str.encode)
consumer = KafkaConsumer(bootstrap_servers=self.kafka_server,
key_deserializer=bytes.decode,
value_deserializer=bytes.decode,
consumer_timeout_ms=10000,
group_id='mon-collector-' + str(uuid.uuid4()))
consumer.subscribe(['metric_response'])
common_db = dbmongo.DbMongo()
common_db.db_connect({'host': self.common_db_host, 'port': int(self.common_db_port), 'name': 'osm'})
while True:
try:
time.sleep(self.collector_interval)
vnfrs = common_db.get_list('vnfrs')
for vnfr in vnfrs:
vnfd = common_db.get_one('vnfds', {"_id": vnfr['vnfd-id']})
for vdur in vnfr['vdur']:
vdu = next(
filter(lambda vdu: vdu['id'] == vdur['vdu-id-ref'], vnfd['vdu'])
)
if 'monitoring-param' in vdu:
for param in vdu['monitoring-param']:
metric_name = param['nfvi-metric']
nsr_id = vnfr['nsr-id-ref']
vnf_member_index = vnfr['member-vnf-index-ref']
vdu_name = vdur['name']
cor_id = random.randint(1, 10e7)
payload = {
'correlation_id': cor_id,
'metric_name': metric_name,
'ns_id': nsr_id,
'vnf_member_index': vnf_member_index,
'vdu_name': vdu_name,
'collection_period': 1,
'collection_unit': 'DAY',
}
producer.send(topic='metric_request', key='read_metric_data_request',
value=json.dumps(payload))
producer.flush()
for message in consumer:
if message.key == 'read_metric_data_response':
content = json.loads(message.value)
if content['correlation_id'] == cor_id and len(
content['metrics_data']['metrics_series']):
metric_reading = content['metrics_data']['metrics_series'][-1]
if metric_name not in self.metrics.keys():
self.metrics[metric_name] = Gauge(metric_name,
'Metric generated by MON collector',
['ns_id',
'vnf_member_index',
'vdu_name'])
self.metrics[metric_name].labels(
ns_id=nsr_id,
vnf_member_index=vnf_member_index,
vdu_name=vdu_name
).set(metric_reading)
break
except Exception:
log.exception("Error collecting metrics: ")
def run(self):
t1 = threading.Thread(target=self._run_exporter)
t1.start()
t2 = threading.Thread(target=self._run_collector)
t2.start()
if __name__ == '__main__':
MonExporter().run()