Changes way metrics are collected, removing the use of mon-proxy
[osm/MON.git] / osm_mon / collector / collector.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright 2018 Whitestack, LLC
4 # *************************************************************
5
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
8
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
12
13 # http://www.apache.org/licenses/LICENSE-2.0
14
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
19 # under the License.
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact: bdiaz@whitestack.com or glavado@whitestack.com
22 ##
23 import logging
24 import multiprocessing
25 import time
26
27 from osm_mon.collector.backends.prometheus import PrometheusBackend
28 from osm_mon.collector.collectors.juju import VCACollector
29 from osm_mon.collector.collectors.openstack import OpenstackCollector
30 from osm_mon.common.common_db_client import CommonDbClient
31 from osm_mon.core.database import DatabaseManager
32 from osm_mon.core.settings import Config
33
34 log = logging.getLogger(__name__)
35
36 VIM_COLLECTORS = {
37 "openstack": OpenstackCollector
38 }
39
40
41 class Collector:
42 def __init__(self):
43 self.common_db = CommonDbClient()
44 self.producer_timeout = 5
45 self.consumer_timeout = 5
46 self.plugins = []
47 self.database_manager = DatabaseManager()
48 self.database_manager.create_tables()
49
50 def init_plugins(self):
51 prometheus_plugin = PrometheusBackend()
52 self.plugins.append(prometheus_plugin)
53
54 def collect_forever(self):
55 log.debug('collect_forever')
56 cfg = Config.instance()
57 while True:
58 try:
59 self.collect_metrics()
60 time.sleep(cfg.OSMMON_COLLECTOR_INTERVAL)
61 except Exception:
62 log.exception("Error collecting metrics")
63
64 def _get_vim_account_id(self, nsr_id: str, vnf_member_index: int) -> str:
65 vnfr = self.common_db.get_vnfr(nsr_id, vnf_member_index)
66 return vnfr['vim-account-id']
67
68 def _get_vim_type(self, vim_account_id):
69 """Get the vim type that is required by the message."""
70 credentials = self.database_manager.get_credentials(vim_account_id)
71 return credentials.type
72
73 def _init_vim_collector_and_collect(self, vnfr: dict, vim_account_id: str, queue: multiprocessing.Queue):
74 # TODO(diazb) Add support for vrops and aws
75 vim_type = self._get_vim_type(vim_account_id)
76 if vim_type in VIM_COLLECTORS:
77 collector = VIM_COLLECTORS[vim_type](vim_account_id)
78 collector.collect(vnfr, queue)
79 else:
80 log.debug("vimtype %s is not supported.", vim_type)
81
82 def _init_vca_collector_and_collect(self, vnfr: dict, queue: multiprocessing.Queue):
83 vca_collector = VCACollector()
84 vca_collector.collect(vnfr, queue)
85
86 def collect_metrics(self):
87 queue = multiprocessing.Queue()
88 vnfrs = self.common_db.get_vnfrs()
89 processes = []
90 for vnfr in vnfrs:
91 nsr_id = vnfr['nsr-id-ref']
92 vnf_member_index = vnfr['member-vnf-index-ref']
93 vim_account_id = self._get_vim_account_id(nsr_id, vnf_member_index)
94 p = multiprocessing.Process(target=self._init_vim_collector_and_collect,
95 args=(vnfr, vim_account_id, queue))
96 processes.append(p)
97 p.start()
98 p = multiprocessing.Process(target=self._init_vca_collector_and_collect,
99 args=(vnfr, queue))
100 processes.append(p)
101 p.start()
102 for process in processes:
103 process.join()
104 metrics = []
105 while not queue.empty():
106 metrics.append(queue.get())
107 for plugin in self.plugins:
108 plugin.handle(metrics)