9bf395334e3212dbf423f98c10a0b9fb7cf7c69a
1 # -*- coding: utf-8 -*-
3 # Copyright 2018 Whitestack, LLC
4 # *************************************************************
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact: bdiaz@whitestack.com or glavado@whitestack.com
27 from collections
import Iterable
29 from kafka
import KafkaProducer
, KafkaConsumer
30 from osm_common
import dbmongo
31 from prometheus_client
.core
import GaugeMetricFamily
33 from osm_mon
.core
.settings
import Config
35 log
= logging
.getLogger(__name__
)
40 cfg
= Config
.instance()
41 self
.kafka_server
= cfg
.BROKER_URI
42 self
.common_db_host
= cfg
.MONGO_URI
.split(':')[0]
43 self
.common_db_port
= cfg
.MONGO_URI
.split(':')[1]
44 self
.common_db
= dbmongo
.DbMongo()
45 self
.common_db
.db_connect({'host': self
.common_db_host
, 'port': int(self
.common_db_port
), 'name': 'osm'})
46 self
.producer
= KafkaProducer(bootstrap_servers
=self
.kafka_server
,
47 key_serializer
=str.encode
,
48 value_serializer
=str.encode
)
49 self
.consumer
= KafkaConsumer(bootstrap_servers
=self
.kafka_server
,
50 key_deserializer
=bytes
.decode
,
51 value_deserializer
=bytes
.decode
,
52 consumer_timeout_ms
=10000,
53 group_id
='mon-collector-' + str(uuid
.uuid4()))
54 self
.consumer
.subscribe(['metric_response'])
56 def collect_metrics(self
) -> Iterable
:
57 # TODO(diazb): Remove dependencies on prometheus_client
58 log
.debug("collect_metrics")
60 vnfrs
= self
.common_db
.get_list('vnfrs')
62 nsr_id
= vnfr
['nsr-id-ref']
63 vnfd
= self
.common_db
.get_one('vnfds', {"_id": vnfr
['vnfd-id']})
64 payloads
= self
._generate
_metric
_data
_payloads
(vnfr
, vnfd
)
65 for payload
in payloads
:
66 cor_id
= payload
['correlation_id']
67 metric_name
= payload
['metric_name']
68 vnf_member_index
= payload
['vnf_member_index']
69 vdu_name
= payload
['vdu_name']
70 self
.producer
.send(topic
='metric_request', key
='read_metric_data_request',
71 value
=json
.dumps(payload
))
73 for message
in self
.consumer
:
74 if message
.key
== 'read_metric_data_response':
75 content
= json
.loads(message
.value
)
76 if content
['correlation_id'] == cor_id
:
77 if len(content
['metrics_data']['metrics_series']):
78 metric_reading
= content
['metrics_data']['metrics_series'][-1]
79 if metric_name
not in metrics
.keys():
80 metrics
[metric_name
] = GaugeMetricFamily(
83 labels
=['ns_id', 'vnf_member_index', 'vdu_name']
85 metrics
[metric_name
].add_metric([nsr_id
, vnf_member_index
, vdu_name
],
88 return metrics
.values()
91 def _generate_metric_data_payloads(vnfr
: dict, vnfd
: dict) -> list:
92 log
.debug('_generate_metric_data_payloads')
94 nsr_id
= vnfr
['nsr-id-ref']
95 for vdur
in vnfr
['vdur']:
96 # This avoids errors when vdur records have not been completely filled
97 if 'name' not in vdur
:
100 filter(lambda vdu
: vdu
['id'] == vdur
['vdu-id-ref'], vnfd
['vdu'])
102 if 'monitoring-param' in vdu
:
103 for param
in vdu
['monitoring-param']:
104 metric_name
= param
['nfvi-metric']
105 vnf_member_index
= vnfr
['member-vnf-index-ref']
106 vdu_name
= vdur
['name']
107 cor_id
= random
.randint(1, 10e7
)
109 'correlation_id': cor_id
,
110 'metric_name': metric_name
,
112 'vnf_member_index': vnf_member_index
,
113 'vdu_name': vdu_name
,
114 'collection_period': 1,
115 'collection_unit': 'DAY',
117 payloads
.append(payload
)