5074308600fb9be4ae381104c424098523f91b79
[osm/MON.git] / osm_mon / collector / collector.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright 2018 Whitestack, LLC
4 # *************************************************************
5
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
8
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
12
13 # http://www.apache.org/licenses/LICENSE-2.0
14
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
19 # under the License.
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact: bdiaz@whitestack.com or glavado@whitestack.com
22 ##
23 import logging
24 import multiprocessing
25 import time
26
27 import peewee
28
29 from osm_mon.collector.backends.prometheus import PrometheusBackend
30 from osm_mon.collector.infra_collectors.openstack import OpenstackInfraCollector
31 from osm_mon.collector.vnf_collectors.juju import VCACollector
32 from osm_mon.collector.vnf_collectors.openstack import OpenstackCollector
33 from osm_mon.collector.vnf_collectors.vmware import VMwareCollector
34 from osm_mon.collector.vnf_collectors.vio import VIOCollector
35 from osm_mon.core.common_db import CommonDbClient
36 from osm_mon.core.config import Config
37 from osm_mon.core.database import DatabaseManager
38
39 log = logging.getLogger(__name__)
40
41 VIM_COLLECTORS = {
42 "openstack": OpenstackCollector,
43 "vmware": VMwareCollector,
44 "vio": VIOCollector
45 }
46 VIM_INFRA_COLLECTORS = {
47 "openstack": OpenstackInfraCollector
48 }
49 METRIC_BACKENDS = [
50 PrometheusBackend
51 ]
52
53
54 class Collector:
55 def __init__(self, config: Config):
56 self.conf = config
57 self.common_db = CommonDbClient(self.conf)
58 self.plugins = []
59 self.database_manager = DatabaseManager(self.conf)
60 self.database_manager.create_tables()
61 self.queue = multiprocessing.Queue()
62 self._init_backends()
63
64 def collect_forever(self):
65 log.debug('collect_forever')
66 while True:
67 try:
68 self.collect_metrics()
69 time.sleep(int(self.conf.get('collector', 'interval')))
70 except peewee.PeeweeException:
71 log.exception("Database error consuming message: ")
72 raise
73 except Exception:
74 log.exception("Error collecting metrics")
75
76 def _collect_vim_metrics(self, vnfr: dict, vim_account_id: str):
77 # TODO(diazb) Add support for aws
78 database_manager = DatabaseManager(self.conf)
79 vim_type = database_manager.get_vim_type(vim_account_id)
80 if vim_type in VIM_COLLECTORS:
81 collector = VIM_COLLECTORS[vim_type](self.conf, vim_account_id)
82 metrics = collector.collect(vnfr)
83 for metric in metrics:
84 self.queue.put(metric)
85 else:
86 log.debug("vimtype %s is not supported.", vim_type)
87
88 def _collect_vim_infra_metrics(self, vim_account_id: str):
89 database_manager = DatabaseManager(self.conf)
90 vim_type = database_manager.get_vim_type(vim_account_id)
91 if vim_type in VIM_INFRA_COLLECTORS:
92 collector = VIM_INFRA_COLLECTORS[vim_type](self.conf, vim_account_id)
93 metrics = collector.collect()
94 for metric in metrics:
95 self.queue.put(metric)
96 else:
97 log.debug("vimtype %s is not supported.", vim_type)
98
99 def _collect_vca_metrics(self, vnfr: dict):
100 log.debug('_collect_vca_metrics')
101 log.debug('vnfr: %s', vnfr)
102 vca_collector = VCACollector(self.conf)
103 metrics = vca_collector.collect(vnfr)
104 for metric in metrics:
105 self.queue.put(metric)
106
107 def collect_metrics(self):
108 vnfrs = self.common_db.get_vnfrs()
109 processes = []
110 for vnfr in vnfrs:
111 nsr_id = vnfr['nsr-id-ref']
112 vnf_member_index = vnfr['member-vnf-index-ref']
113 vim_account_id = self.common_db.get_vim_account_id(nsr_id, vnf_member_index)
114 p = multiprocessing.Process(target=self._collect_vim_metrics,
115 args=(vnfr, vim_account_id))
116 processes.append(p)
117 p.start()
118 p = multiprocessing.Process(target=self._collect_vca_metrics,
119 args=(vnfr,))
120 processes.append(p)
121 p.start()
122 vims = self.common_db.get_vim_accounts()
123 for vim in vims:
124 p = multiprocessing.Process(target=self._collect_vim_infra_metrics,
125 args=(vim['_id'],))
126 processes.append(p)
127 p.start()
128 for process in processes:
129 process.join(timeout=10)
130 metrics = []
131 while not self.queue.empty():
132 metrics.append(self.queue.get())
133 for plugin in self.plugins:
134 plugin.handle(metrics)
135
136 def _init_backends(self):
137 for backend in METRIC_BACKENDS:
138 self.plugins.append(backend())