Adds MON Prometheus exporter
[osm/MON.git] / osm_mon / exporter / exporter.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright 2018 Whitestack, LLC
4 # *************************************************************
5
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
8
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
12
13 # http://www.apache.org/licenses/LICENSE-2.0
14
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
19 # under the License.
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact: bdiaz@whitestack.com or glavado@whitestack.com
22 ##
23 import json
24 import logging
25 import random
26 import threading
27 import time
28 import uuid
29
30 from kafka import KafkaProducer, KafkaConsumer
31 from osm_common import dbmongo
32 from prometheus_client import start_http_server, Gauge
33
34 from osm_mon.core.settings import Config
35
36 log = logging.getLogger(__name__)
37
38
39 class MonExporter:
40
41 def __init__(self):
42 cfg = Config.instance()
43 self.kafka_server = cfg.BROKER_URI
44 self.common_db_host = cfg.MONGO_URI.split(':')[0]
45 self.common_db_port = cfg.MONGO_URI.split(':')[1]
46 self.collector_interval = 5
47 self.metrics = {}
48
49 def _run_exporter(self):
50 start_http_server(8000)
51
52 def _run_collector(self):
53 producer = KafkaProducer(bootstrap_servers=self.kafka_server,
54 key_serializer=str.encode,
55 value_serializer=str.encode)
56 consumer = KafkaConsumer(bootstrap_servers=self.kafka_server,
57 key_deserializer=bytes.decode,
58 value_deserializer=bytes.decode,
59 consumer_timeout_ms=10000,
60 group_id='mon-collector-' + str(uuid.uuid4()))
61 consumer.subscribe(['metric_response'])
62 common_db = dbmongo.DbMongo()
63 common_db.db_connect({'host': self.common_db_host, 'port': int(self.common_db_port), 'name': 'osm'})
64
65 while True:
66 try:
67 time.sleep(self.collector_interval)
68 vnfrs = common_db.get_list('vnfrs')
69 for vnfr in vnfrs:
70 vnfd = common_db.get_one('vnfds', {"_id": vnfr['vnfd-id']})
71 for vdur in vnfr['vdur']:
72 vdu = next(
73 filter(lambda vdu: vdu['id'] == vdur['vdu-id-ref'], vnfd['vdu'])
74 )
75 if 'monitoring-param' in vdu:
76 for param in vdu['monitoring-param']:
77 metric_name = param['nfvi-metric']
78 nsr_id = vnfr['nsr-id-ref']
79 vnf_member_index = vnfr['member-vnf-index-ref']
80 vdu_name = vdur['name']
81 cor_id = random.randint(1, 10e7)
82 payload = {
83 'correlation_id': cor_id,
84 'metric_name': metric_name,
85 'ns_id': nsr_id,
86 'vnf_member_index': vnf_member_index,
87 'vdu_name': vdu_name,
88 'collection_period': 1,
89 'collection_unit': 'DAY',
90 }
91 producer.send(topic='metric_request', key='read_metric_data_request',
92 value=json.dumps(payload))
93 producer.flush()
94 for message in consumer:
95 if message.key == 'read_metric_data_response':
96 content = json.loads(message.value)
97 if content['correlation_id'] == cor_id and len(
98 content['metrics_data']['metrics_series']):
99 metric_reading = content['metrics_data']['metrics_series'][-1]
100 if metric_name not in self.metrics.keys():
101 self.metrics[metric_name] = Gauge(metric_name,
102 'Metric generated by MON collector',
103 ['ns_id',
104 'vnf_member_index',
105 'vdu_name'])
106 self.metrics[metric_name].labels(
107 ns_id=nsr_id,
108 vnf_member_index=vnf_member_index,
109 vdu_name=vdu_name
110 ).set(metric_reading)
111 break
112
113
114 except Exception:
115 log.exception("Error collecting metrics: ")
116
117 def run(self):
118 t1 = threading.Thread(target=self._run_exporter)
119 t1.start()
120 t2 = threading.Thread(target=self._run_collector)
121 t2.start()
122
123
124 if __name__ == '__main__':
125 MonExporter().run()