Resolved 1698 - MON fails to collect metrics from an OpenStack VIM with metrics suppo...
[osm/MON.git] / osm_mon / collector / service.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright 2018 Whitestack, LLC
4 # *************************************************************
5
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
8
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
12
13 # http://www.apache.org/licenses/LICENSE-2.0
14
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
19 # under the License.
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact: bdiaz@whitestack.com or glavado@whitestack.com
22 ##
23
24 # This version uses a ProcessThreadPoolExecutor to limit the number of processes launched
25
26 import logging
27 from typing import List
28 import concurrent.futures
29 import time
30 import keystoneauth1.exceptions
31
32 from osm_mon.collector.infra_collectors.onos import OnosInfraCollector
33 from osm_mon.collector.infra_collectors.openstack import OpenstackInfraCollector
34 from osm_mon.collector.infra_collectors.vio import VIOInfraCollector
35 from osm_mon.collector.infra_collectors.vmware import VMwareInfraCollector
36 from osm_mon.collector.metric import Metric
37 from osm_mon.collector.vnf_collectors.juju import VCACollector
38 from osm_mon.collector.vnf_collectors.openstack import OpenstackCollector
39 from osm_mon.collector.vnf_collectors.vio import VIOCollector
40 from osm_mon.collector.vnf_collectors.vmware import VMwareCollector
41 from osm_mon.core.common_db import CommonDbClient
42 from osm_mon.core.config import Config
43
44 log = logging.getLogger(__name__)
45
46 VIM_COLLECTORS = {
47 "openstack": OpenstackCollector,
48 "vmware": VMwareCollector,
49 "vio": VIOCollector
50 }
51 VIM_INFRA_COLLECTORS = {
52 "openstack": OpenstackInfraCollector,
53 "vmware": VMwareInfraCollector,
54 "vio": VIOInfraCollector
55 }
56 SDN_INFRA_COLLECTORS = {
57 "onosof": OnosInfraCollector,
58 "onos_vpls": OnosInfraCollector
59 }
60
61
62 class CollectorService:
63
64 def __init__(self, config: Config):
65 self.conf = config
66 self.common_db = CommonDbClient(self.conf)
67 return
68
69 # static methods to be executed in the Processes
70 @staticmethod
71 def _get_vim_type(conf: Config, vim_account_id: str) -> str:
72 common_db = CommonDbClient(conf)
73 vim_account = common_db.get_vim_account(vim_account_id)
74 vim_type = vim_account['vim_type']
75 if 'config' in vim_account and 'vim_type' in vim_account['config']:
76 vim_type = vim_account['config']['vim_type'].lower()
77 if vim_type == 'vio' and 'vrops_site' not in vim_account['config']:
78 vim_type = 'openstack'
79 return vim_type
80
81 @staticmethod
82 def _collect_vim_metrics(conf: Config, vnfr: dict, vim_account_id: str):
83 # TODO(diazb) Add support for aws
84 metrics = []
85 vim_type = CollectorService._get_vim_type(conf, vim_account_id)
86 log.debug("vim type.....{}".format(vim_type))
87 if vim_type in VIM_COLLECTORS:
88 collector = VIM_COLLECTORS[vim_type](conf, vim_account_id)
89 metrics = collector.collect(vnfr)
90 log.debug("Collecting vim metrics.....{}".format(metrics))
91 else:
92 log.debug("vimtype %s is not supported.", vim_type)
93 return metrics
94
95 @staticmethod
96 def _collect_vca_metrics(conf: Config, vnfr: dict):
97 metrics = []
98 vca_collector = VCACollector(conf)
99 metrics = vca_collector.collect(vnfr)
100 log.debug("Collecting vca metrics.....{}".format(metrics))
101 return metrics
102
103 @staticmethod
104 def _collect_vim_infra_metrics(conf: Config, vim_account_id: str):
105 log.info("Collecting vim infra metrics")
106 metrics = []
107 vim_type = CollectorService._get_vim_type(conf, vim_account_id)
108 if vim_type in VIM_INFRA_COLLECTORS:
109 collector = VIM_INFRA_COLLECTORS[vim_type](conf, vim_account_id)
110 metrics = collector.collect()
111 log.debug("Collecting vim infra metrics.....{}".format(metrics))
112 else:
113 log.debug("vimtype %s is not supported.", vim_type)
114 return metrics
115
116 @staticmethod
117 def _collect_sdnc_infra_metrics(conf: Config, sdnc_id: str):
118 log.info("Collecting sdnc metrics")
119 metrics = []
120 common_db = CommonDbClient(conf)
121 sdn_type = common_db.get_sdnc(sdnc_id)['type']
122 if sdn_type in SDN_INFRA_COLLECTORS:
123 collector = SDN_INFRA_COLLECTORS[sdn_type](conf, sdnc_id)
124 metrics = collector.collect()
125 log.debug("Collecting sdnc metrics.....{}".format(metrics))
126 else:
127 log.debug("sdn_type %s is not supported.", sdn_type)
128 return metrics
129
130 @staticmethod
131 def _stop_process_pool(executor):
132 log.info('Shutting down process pool')
133 try:
134 log.debug('Stopping residual processes in the process pool')
135 for pid, process in executor._processes.items():
136 if process.is_alive():
137 process.terminate()
138 except Exception as e:
139 log.info("Exception during process termination")
140 log.debug("Exception %s" % (e))
141
142 try:
143 # Shutting down executor
144 log.debug('Shutting down process pool executor')
145 executor.shutdown()
146 except RuntimeError as e:
147 log.info('RuntimeError in shutting down executer')
148 log.debug('RuntimeError %s' % (e))
149 return
150
151 def collect_metrics(self) -> List[Metric]:
152 vnfrs = self.common_db.get_vnfrs()
153 metrics = []
154
155 start_time = time.time()
156 # Starting executor pool with pool size process_pool_size. Default process_pool_size is 20
157 with concurrent.futures.ProcessPoolExecutor(self.conf.get('collector', 'process_pool_size')) as executor:
158 log.info('Started metric collector process pool with pool size %s' % (self.conf.get('collector',
159 'process_pool_size')))
160 futures = []
161 for vnfr in vnfrs:
162 nsr_id = vnfr['nsr-id-ref']
163 vnf_member_index = vnfr['member-vnf-index-ref']
164 vim_account_id = self.common_db.get_vim_account_id(nsr_id, vnf_member_index)
165 futures.append(executor.submit(CollectorService._collect_vim_metrics, self.conf, vnfr, vim_account_id))
166 futures.append(executor.submit(CollectorService._collect_vca_metrics, self.conf, vnfr))
167
168 vims = self.common_db.get_vim_accounts()
169 for vim in vims:
170 futures.append(executor.submit(CollectorService._collect_vim_infra_metrics, self.conf, vim['_id']))
171
172 sdncs = self.common_db.get_sdncs()
173 for sdnc in sdncs:
174 futures.append(executor.submit(CollectorService._collect_sdnc_infra_metrics, self.conf, sdnc['_id']))
175
176 try:
177 # Wait for future calls to complete till process_execution_timeout. Default is 50 seconds
178 for future in concurrent.futures.as_completed(futures, self.conf.get('collector',
179 'process_execution_timeout')):
180 try:
181 result = future.result(timeout=int(self.conf.get('collector',
182 'process_execution_timeout')))
183 metrics.extend(result)
184 log.debug('result = %s' % (result))
185 except keystoneauth1.exceptions.connection.ConnectionError as e:
186 log.info("Keystone connection error during metric collection")
187 log.debug("Keystone connection error exception %s" % (e))
188 except concurrent.futures.TimeoutError as e:
189 # Some processes have not completed due to timeout error
190 log.info('Some processes have not finished due to TimeoutError exception')
191 log.debug('concurrent.futures.TimeoutError exception %s' % (e))
192
193 # Shutting down process pool executor
194 CollectorService._stop_process_pool(executor)
195
196 end_time = time.time()
197 log.info("Collection completed in %s seconds", end_time - start_time)
198
199 return metrics