f4ffba4853ad9f2a2d0ea0d01c2624ee95a3e098
[osm/MON.git] / osm_mon / collector / collector.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright 2018 Whitestack, LLC
4 # *************************************************************
5
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
8
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
12
13 # http://www.apache.org/licenses/LICENSE-2.0
14
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
19 # under the License.
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact: bdiaz@whitestack.com or glavado@whitestack.com
22 ##
23 import logging
24 import multiprocessing
25 import time
26
27 import peewee
28
29 from osm_mon.collector.backends.prometheus import PrometheusBackend
30 from osm_mon.collector.infra_collectors.onos import OnosInfraCollector
31 from osm_mon.collector.infra_collectors.openstack import OpenstackInfraCollector
32 from osm_mon.collector.vnf_collectors.juju import VCACollector
33 from osm_mon.collector.vnf_collectors.openstack import OpenstackCollector
34 from osm_mon.collector.vnf_collectors.vmware import VMwareCollector
35 from osm_mon.collector.vnf_collectors.vio import VIOCollector
36 from osm_mon.core.common_db import CommonDbClient
37 from osm_mon.core.config import Config
38 from osm_mon.core.database import DatabaseManager
39
40 log = logging.getLogger(__name__)
41
42 VIM_COLLECTORS = {
43 "openstack": OpenstackCollector,
44 "vmware": VMwareCollector,
45 "vio": VIOCollector
46 }
47 VIM_INFRA_COLLECTORS = {
48 "openstack": OpenstackInfraCollector
49 }
50 SDN_INFRA_COLLECTORS = {
51 "onos": OnosInfraCollector
52 }
53 METRIC_BACKENDS = [
54 PrometheusBackend
55 ]
56
57
58 class Collector:
59 def __init__(self, config: Config):
60 self.conf = config
61 self.common_db = CommonDbClient(self.conf)
62 self.plugins = []
63 self.database_manager = DatabaseManager(self.conf)
64 self.database_manager.create_tables()
65 self.queue = multiprocessing.Queue()
66 self._init_backends()
67
68 def collect_forever(self):
69 log.debug('collect_forever')
70 while True:
71 try:
72 self.collect_metrics()
73 time.sleep(int(self.conf.get('collector', 'interval')))
74 except peewee.PeeweeException:
75 log.exception("Database error consuming message: ")
76 raise
77 except Exception:
78 log.exception("Error collecting metrics")
79
80 def _collect_vim_metrics(self, vnfr: dict, vim_account_id: str):
81 # TODO(diazb) Add support for aws
82 database_manager = DatabaseManager(self.conf)
83 vim_type = database_manager.get_vim_type(vim_account_id)
84 if vim_type in VIM_COLLECTORS:
85 collector = VIM_COLLECTORS[vim_type](self.conf, vim_account_id)
86 metrics = collector.collect(vnfr)
87 for metric in metrics:
88 self.queue.put(metric)
89 else:
90 log.debug("vimtype %s is not supported.", vim_type)
91
92 def _collect_vim_infra_metrics(self, vim_account_id: str):
93 database_manager = DatabaseManager(self.conf)
94 vim_type = database_manager.get_vim_type(vim_account_id)
95 if vim_type in VIM_INFRA_COLLECTORS:
96 collector = VIM_INFRA_COLLECTORS[vim_type](self.conf, vim_account_id)
97 metrics = collector.collect()
98 for metric in metrics:
99 self.queue.put(metric)
100 else:
101 log.debug("vimtype %s is not supported.", vim_type)
102
103 def _collect_sdnc_infra_metrics(self, sdnc_id: str):
104 common_db = CommonDbClient(self.conf)
105 sdn_type = common_db.get_sdnc(sdnc_id)['type']
106 if sdn_type in SDN_INFRA_COLLECTORS:
107 collector = SDN_INFRA_COLLECTORS[sdn_type](self.conf, sdnc_id)
108 metrics = collector.collect()
109 for metric in metrics:
110 self.queue.put(metric)
111 else:
112 log.debug("sdn_type %s is not supported.", sdn_type)
113
114 def _collect_vca_metrics(self, vnfr: dict):
115 log.debug('_collect_vca_metrics')
116 log.debug('vnfr: %s', vnfr)
117 vca_collector = VCACollector(self.conf)
118 metrics = vca_collector.collect(vnfr)
119 for metric in metrics:
120 self.queue.put(metric)
121
122 def collect_metrics(self):
123 vnfrs = self.common_db.get_vnfrs()
124 processes = []
125 for vnfr in vnfrs:
126 nsr_id = vnfr['nsr-id-ref']
127 vnf_member_index = vnfr['member-vnf-index-ref']
128 vim_account_id = self.common_db.get_vim_account_id(nsr_id, vnf_member_index)
129 p = multiprocessing.Process(target=self._collect_vim_metrics,
130 args=(vnfr, vim_account_id))
131 processes.append(p)
132 p.start()
133 p = multiprocessing.Process(target=self._collect_vca_metrics,
134 args=(vnfr,))
135 processes.append(p)
136 p.start()
137 vims = self.common_db.get_vim_accounts()
138 for vim in vims:
139 p = multiprocessing.Process(target=self._collect_vim_infra_metrics,
140 args=(vim['_id'],))
141 processes.append(p)
142 p.start()
143 sdncs = self.common_db.get_sdncs()
144 for sdnc in sdncs:
145 p = multiprocessing.Process(target=self._collect_sdnc_infra_metrics,
146 args=(sdnc['_id'],))
147 processes.append(p)
148 p.start()
149 for process in processes:
150 process.join(timeout=10)
151 metrics = []
152 while not self.queue.empty():
153 metrics.append(self.queue.get())
154 for plugin in self.plugins:
155 plugin.handle(metrics)
156
157 def _init_backends(self):
158 for backend in METRIC_BACKENDS:
159 self.plugins.append(backend())