aad395a15a2eabb3004331680df3c92679e2a5c3
[osm/MON.git] / osm_mon / collector / collector.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright 2018 Whitestack, LLC
4 # *************************************************************
5
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
8
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
12
13 # http://www.apache.org/licenses/LICENSE-2.0
14
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
19 # under the License.
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact: bdiaz@whitestack.com or glavado@whitestack.com
22 ##
23 import logging
24 import multiprocessing
25 import time
26
27 import peewee
28
29 from osm_mon.collector.backends.prometheus import PrometheusBackend
30 from osm_mon.collector.infra_collectors.openstack import OpenstackInfraCollector
31 from osm_mon.collector.metric import Metric
32 from osm_mon.collector.vnf_collectors.juju import VCACollector
33 from osm_mon.collector.vnf_collectors.openstack import OpenstackCollector
34 from osm_mon.collector.vnf_collectors.vmware import VMwareCollector
35 from osm_mon.core.common_db import CommonDbClient
36 from osm_mon.core.config import Config
37 from osm_mon.core.database import DatabaseManager
38
39 log = logging.getLogger(__name__)
40
41 VIM_COLLECTORS = {
42 "openstack": OpenstackCollector,
43 "vmware": VMwareCollector
44 }
45 VIM_INFRA_COLLECTORS = {
46 "openstack": OpenstackInfraCollector
47 }
48 METRIC_BACKENDS = [
49 PrometheusBackend
50 ]
51
52
53 class Collector:
54 def __init__(self, config: Config):
55 self.conf = config
56 self.common_db = CommonDbClient(self.conf)
57 self.plugins = []
58 self.database_manager = DatabaseManager(self.conf)
59 self.database_manager.create_tables()
60 self.queue = multiprocessing.Queue()
61 self._init_backends()
62
63 def collect_forever(self):
64 log.debug('collect_forever')
65 while True:
66 try:
67 self.collect_metrics()
68 time.sleep(int(self.conf.get('collector', 'interval')))
69 except peewee.PeeweeException:
70 log.exception("Database error consuming message: ")
71 raise
72 except Exception:
73 log.exception("Error collecting metrics")
74
75 def _collect_vim_metrics(self, vnfr: dict, vim_account_id: str):
76 # TODO(diazb) Add support for vrops and aws
77 database_manager = DatabaseManager(self.conf)
78 vim_type = database_manager.get_vim_type(vim_account_id)
79 if vim_type in VIM_COLLECTORS:
80 collector = VIM_COLLECTORS[vim_type](self.conf, vim_account_id)
81 metrics = collector.collect(vnfr)
82 for metric in metrics:
83 self.queue.put(metric)
84 else:
85 log.debug("vimtype %s is not supported.", vim_type)
86
87 def _collect_vim_infra_metrics(self, vim_account_id: str):
88 database_manager = DatabaseManager(self.conf)
89 vim_type = database_manager.get_vim_type(vim_account_id)
90 if vim_type in VIM_INFRA_COLLECTORS:
91 collector = VIM_INFRA_COLLECTORS[vim_type](self.conf, vim_account_id)
92 status = collector.is_vim_ok()
93 status_metric = Metric({'vim_id': vim_account_id}, 'vim_status', status)
94 self.queue.put(status_metric)
95 else:
96 log.debug("vimtype %s is not supported.", vim_type)
97
98 def _collect_vca_metrics(self, vnfr: dict):
99 log.debug('_collect_vca_metrics')
100 log.debug('vnfr: %s', vnfr)
101 vca_collector = VCACollector(self.conf)
102 metrics = vca_collector.collect(vnfr)
103 for metric in metrics:
104 self.queue.put(metric)
105
106 def collect_metrics(self):
107 vnfrs = self.common_db.get_vnfrs()
108 processes = []
109 for vnfr in vnfrs:
110 nsr_id = vnfr['nsr-id-ref']
111 vnf_member_index = vnfr['member-vnf-index-ref']
112 vim_account_id = self.common_db.get_vim_account_id(nsr_id, vnf_member_index)
113 p = multiprocessing.Process(target=self._collect_vim_metrics,
114 args=(vnfr, vim_account_id))
115 processes.append(p)
116 p.start()
117 p = multiprocessing.Process(target=self._collect_vca_metrics,
118 args=(vnfr,))
119 processes.append(p)
120 p.start()
121 vims = self.common_db.get_vim_accounts()
122 for vim in vims:
123 p = multiprocessing.Process(target=self._collect_vim_infra_metrics,
124 args=(vim['_id'],))
125 processes.append(p)
126 p.start()
127 for process in processes:
128 process.join(timeout=10)
129 metrics = []
130 while not self.queue.empty():
131 metrics.append(self.queue.get())
132 for plugin in self.plugins:
133 plugin.handle(metrics)
134
135 def _init_backends(self):
136 for backend in METRIC_BACKENDS:
137 self.plugins.append(backend())