1 # -*- coding: utf-8 -*-
3 # Copyright 2018 Whitestack, LLC
4 # *************************************************************
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact: bdiaz@whitestack.com or glavado@whitestack.com
28 from string
import ascii_lowercase
30 from kafka
import KafkaProducer
, KafkaConsumer
31 from n2vc
.vnf
import N2VC
32 from prometheus_client
.core
import GaugeMetricFamily
34 from osm_mon
.common
.common_db_client
import CommonDbClient
35 from osm_mon
.core
.settings
import Config
37 log
= logging
.getLogger(__name__
)
42 cfg
= Config
.instance()
43 self
.kafka_server
= cfg
.BROKER_URI
44 self
.common_db_client
= CommonDbClient()
45 self
.n2vc
= N2VC(server
=cfg
.OSMMON_VCA_HOST
, secret
=cfg
.OSMMON_VCA_SECRET
)
46 self
.producer
= KafkaProducer(bootstrap_servers
=self
.kafka_server
,
47 key_serializer
=str.encode
,
48 value_serializer
=str.encode
)
49 self
.consumer
= KafkaConsumer(bootstrap_servers
=self
.kafka_server
,
50 key_deserializer
=bytes
.decode
,
51 value_deserializer
=bytes
.decode
,
52 consumer_timeout_ms
=10000,
53 group_id
='mon-collector-' + str(uuid
.uuid4()))
54 self
.consumer
.subscribe(['metric_response'])
56 async def collect_metrics(self
):
58 Collects vdu metrics. These can be vim and/or n2vc metrics.
59 It checks for monitoring-params or metrics inside vdu section of vnfd, then collects the metric accordingly.
60 If vim related, it sends a metric read request through Kafka, to be handled by mon-proxy.
61 If n2vc related, it uses the n2vc client to obtain the readings.
62 :return: lists of metrics
64 # TODO(diazb): Remove dependencies on prometheus_client
65 log
.debug("collect_metrics")
68 vnfrs
= self
.common_db_client
.get_vnfrs()
69 vca_model_name
= 'default'
71 nsr_id
= vnfr
['nsr-id-ref']
72 vnfd
= self
.common_db_client
.get_vnfd(vnfr
['vnfd-id'])
73 for vdur
in vnfr
['vdur']:
74 # This avoids errors when vdur records have not been completely filled
75 if 'name' not in vdur
:
78 filter(lambda vdu
: vdu
['id'] == vdur
['vdu-id-ref'], vnfd
['vdu'])
80 vnf_member_index
= vnfr
['member-vnf-index-ref']
81 vdu_name
= vdur
['name']
82 if 'monitoring-param' in vdu
:
83 for param
in vdu
['monitoring-param']:
84 metric_name
= param
['nfvi-metric']
85 payload
= await self
._generate
_read
_metric
_payload
(metric_name
, nsr_id
, vdu_name
,
87 self
.producer
.send(topic
='metric_request', key
='read_metric_data_request',
88 value
=json
.dumps(payload
))
90 for message
in self
.consumer
:
91 if message
.key
== 'read_metric_data_response':
92 content
= json
.loads(message
.value
)
93 if content
['correlation_id'] == payload
['correlation_id']:
94 if len(content
['metrics_data']['metrics_series']):
95 metric_reading
= content
['metrics_data']['metrics_series'][-1]
96 if metric_name
not in metrics
.keys():
97 metrics
[metric_name
] = GaugeMetricFamily(
100 labels
=['ns_id', 'vnf_member_index', 'vdu_name']
102 metrics
[metric_name
].add_metric([nsr_id
, vnf_member_index
, vdu_name
],
105 if 'vdu-configuration' in vdu
and 'metrics' in vdu
['vdu-configuration']:
106 vnf_name_vca
= await self
._generate
_vca
_vdu
_name
(vdu_name
)
107 vnf_metrics
= await self
.n2vc
.GetMetrics(vca_model_name
, vnf_name_vca
)
108 log
.debug('VNF Metrics: %s', vnf_metrics
)
109 for vnf_metric_list
in vnf_metrics
.values():
110 for vnf_metric
in vnf_metric_list
:
111 log
.debug("VNF Metric: %s", vnf_metric
)
112 if vnf_metric
['key'] not in metrics
.keys():
113 metrics
[vnf_metric
['key']] = GaugeMetricFamily(
116 labels
=['ns_id', 'vnf_member_index', 'vdu_name']
118 metrics
[vnf_metric
['key']].add_metric([nsr_id
, vnf_member_index
, vdu_name
],
119 float(vnf_metric
['value']))
120 log
.debug("metric.values = %s", metrics
.values())
121 return metrics
.values()
122 except Exception as e
:
123 log
.exception("Error collecting metrics")
127 async def _generate_vca_vdu_name(vdu_name
) -> str:
129 Replaces all digits in vdu name for corresponding ascii characters. This is the format required by N2VC.
130 :param vdu_name: Vdu name according to the vdur
131 :return: Name with digits replaced with characters
133 vnf_name_vca
= ''.join(
134 ascii_lowercase
[int(char
)] if char
.isdigit() else char
for char
in vdu_name
)
135 vnf_name_vca
= re
.sub(r
'-[a-z]+$', '', vnf_name_vca
)
139 async def _generate_read_metric_payload(metric_name
, nsr_id
, vdu_name
, vnf_member_index
) -> dict:
141 Builds JSON payload for asking for a metric measurement in MON. It follows the model defined in core.models.
142 :param metric_name: OSM metric name (e.g.: cpu_utilization)
143 :param nsr_id: NSR ID
144 :param vdu_name: Vdu name according to the vdur
145 :param vnf_member_index: Index of the VNF in the NS according to the vnfr
146 :return: JSON payload as dict
148 cor_id
= random
.randint(1, 10e7
)
150 'correlation_id': cor_id
,
151 'metric_name': metric_name
,
153 'vnf_member_index': vnf_member_index
,
154 'vdu_name': vdu_name
,
155 'collection_period': 1,
156 'collection_unit': 'DAY',