blob: aa270833881f9a50e6bd357c812a59d4fedb3771 [file] [log] [blame]
Benjamin Diaz416a7532019-07-29 12:00:38 -03001# -*- coding: utf-8 -*-
2
3# Copyright 2018 Whitestack, LLC
4# *************************************************************
5
6# This file is part of OSM Monitoring module
7# All Rights Reserved to Whitestack, LLC
8
9# Licensed under the Apache License, Version 2.0 (the "License"); you may
10# not use this file except in compliance with the License. You may obtain
11# a copy of the License at
12
13# http://www.apache.org/licenses/LICENSE-2.0
14
15# Unless required by applicable law or agreed to in writing, software
16# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18# License for the specific language governing permissions and limitations
19# under the License.
20# For those usages not covered by the Apache License, Version 2.0 please
21# contact: bdiaz@whitestack.com or glavado@whitestack.com
22##
palsus9a773322021-01-20 18:26:13 +000023
24# This version uses a ProcessThreadPoolExecutor to limit the number of processes launched
25
Benjamin Diaza97bdb32019-04-10 15:22:22 -030026import logging
Benjamin Diaza97bdb32019-04-10 15:22:22 -030027from typing import List
palsus9a773322021-01-20 18:26:13 +000028import concurrent.futures
29import time
Benjamin Diaza97bdb32019-04-10 15:22:22 -030030
31from osm_mon.collector.infra_collectors.onos import OnosInfraCollector
32from osm_mon.collector.infra_collectors.openstack import OpenstackInfraCollector
kasare82c0622019-05-09 00:55:30 -070033from osm_mon.collector.infra_collectors.vio import VIOInfraCollector
Benjamin Diaz416a7532019-07-29 12:00:38 -030034from osm_mon.collector.infra_collectors.vmware import VMwareInfraCollector
Benjamin Diaza97bdb32019-04-10 15:22:22 -030035from osm_mon.collector.metric import Metric
Benjamin Diaza97bdb32019-04-10 15:22:22 -030036from osm_mon.collector.vnf_collectors.juju import VCACollector
37from osm_mon.collector.vnf_collectors.openstack import OpenstackCollector
38from osm_mon.collector.vnf_collectors.vio import VIOCollector
39from osm_mon.collector.vnf_collectors.vmware import VMwareCollector
40from osm_mon.core.common_db import CommonDbClient
41from osm_mon.core.config import Config
42
43log = logging.getLogger(__name__)
44
45VIM_COLLECTORS = {
46 "openstack": OpenstackCollector,
47 "vmware": VMwareCollector,
garciadeblas8e4179f2021-05-14 16:47:03 +020048 "vio": VIOCollector,
Benjamin Diaza97bdb32019-04-10 15:22:22 -030049}
50VIM_INFRA_COLLECTORS = {
kasarf840f692019-04-19 03:57:58 -070051 "openstack": OpenstackInfraCollector,
kasare82c0622019-05-09 00:55:30 -070052 "vmware": VMwareInfraCollector,
garciadeblas8e4179f2021-05-14 16:47:03 +020053 "vio": VIOInfraCollector,
Benjamin Diaza97bdb32019-04-10 15:22:22 -030054}
garciadeblas8e4179f2021-05-14 16:47:03 +020055SDN_INFRA_COLLECTORS = {"onosof": OnosInfraCollector, "onos_vpls": OnosInfraCollector}
Benjamin Diaza97bdb32019-04-10 15:22:22 -030056
57
58class CollectorService:
59 def __init__(self, config: Config):
60 self.conf = config
61 self.common_db = CommonDbClient(self.conf)
palsus9a773322021-01-20 18:26:13 +000062 return
Benjamin Diaza97bdb32019-04-10 15:22:22 -030063
palsus9a773322021-01-20 18:26:13 +000064 # static methods to be executed in the Processes
65 @staticmethod
66 def _get_vim_type(conf: Config, vim_account_id: str) -> str:
67 common_db = CommonDbClient(conf)
Benjamin Diaz4de60c52019-08-27 17:49:59 -030068 vim_account = common_db.get_vim_account(vim_account_id)
garciadeblas8e4179f2021-05-14 16:47:03 +020069 vim_type = vim_account["vim_type"]
70 if "config" in vim_account and "vim_type" in vim_account["config"]:
71 vim_type = vim_account["config"]["vim_type"].lower()
72 if vim_type == "vio" and "vrops_site" not in vim_account["config"]:
73 vim_type = "openstack"
Benjamin Diaz4de60c52019-08-27 17:49:59 -030074 return vim_type
palsus9a773322021-01-20 18:26:13 +000075
76 @staticmethod
77 def _collect_vim_metrics(conf: Config, vnfr: dict, vim_account_id: str):
78 # TODO(diazb) Add support for aws
palsuse57f2f12021-03-01 19:59:41 +000079 metrics = []
palsus9a773322021-01-20 18:26:13 +000080 vim_type = CollectorService._get_vim_type(conf, vim_account_id)
81 log.debug("vim type.....{}".format(vim_type))
82 if vim_type in VIM_COLLECTORS:
83 collector = VIM_COLLECTORS[vim_type](conf, vim_account_id)
84 metrics = collector.collect(vnfr)
85 log.debug("Collecting vim metrics.....{}".format(metrics))
palsus9a773322021-01-20 18:26:13 +000086 else:
87 log.debug("vimtype %s is not supported.", vim_type)
palsuse57f2f12021-03-01 19:59:41 +000088 return metrics
palsus9a773322021-01-20 18:26:13 +000089
90 @staticmethod
91 def _collect_vca_metrics(conf: Config, vnfr: dict):
palsuse57f2f12021-03-01 19:59:41 +000092 metrics = []
palsus9a773322021-01-20 18:26:13 +000093 vca_collector = VCACollector(conf)
94 metrics = vca_collector.collect(vnfr)
95 log.debug("Collecting vca metrics.....{}".format(metrics))
palsuse57f2f12021-03-01 19:59:41 +000096 return metrics
palsus9a773322021-01-20 18:26:13 +000097
98 @staticmethod
99 def _collect_vim_infra_metrics(conf: Config, vim_account_id: str):
100 log.info("Collecting vim infra metrics")
palsuse57f2f12021-03-01 19:59:41 +0000101 metrics = []
palsus9a773322021-01-20 18:26:13 +0000102 vim_type = CollectorService._get_vim_type(conf, vim_account_id)
103 if vim_type in VIM_INFRA_COLLECTORS:
104 collector = VIM_INFRA_COLLECTORS[vim_type](conf, vim_account_id)
105 metrics = collector.collect()
106 log.debug("Collecting vim infra metrics.....{}".format(metrics))
palsus9a773322021-01-20 18:26:13 +0000107 else:
108 log.debug("vimtype %s is not supported.", vim_type)
palsuse57f2f12021-03-01 19:59:41 +0000109 return metrics
palsus9a773322021-01-20 18:26:13 +0000110
111 @staticmethod
112 def _collect_sdnc_infra_metrics(conf: Config, sdnc_id: str):
113 log.info("Collecting sdnc metrics")
palsuse57f2f12021-03-01 19:59:41 +0000114 metrics = []
palsus9a773322021-01-20 18:26:13 +0000115 common_db = CommonDbClient(conf)
garciadeblas8e4179f2021-05-14 16:47:03 +0200116 sdn_type = common_db.get_sdnc(sdnc_id)["type"]
palsus9a773322021-01-20 18:26:13 +0000117 if sdn_type in SDN_INFRA_COLLECTORS:
118 collector = SDN_INFRA_COLLECTORS[sdn_type](conf, sdnc_id)
119 metrics = collector.collect()
120 log.debug("Collecting sdnc metrics.....{}".format(metrics))
palsus9a773322021-01-20 18:26:13 +0000121 else:
122 log.debug("sdn_type %s is not supported.", sdn_type)
palsuse57f2f12021-03-01 19:59:41 +0000123 return metrics
palsus9a773322021-01-20 18:26:13 +0000124
125 @staticmethod
126 def _stop_process_pool(executor):
garciadeblas8e4179f2021-05-14 16:47:03 +0200127 log.info("Shutting down process pool")
palsus9a773322021-01-20 18:26:13 +0000128 try:
garciadeblas8e4179f2021-05-14 16:47:03 +0200129 log.debug("Stopping residual processes in the process pool")
palsus9a773322021-01-20 18:26:13 +0000130 for pid, process in executor._processes.items():
131 if process.is_alive():
132 process.terminate()
133 except Exception as e:
134 log.info("Exception during process termination")
135 log.debug("Exception %s" % (e))
palsuse57f2f12021-03-01 19:59:41 +0000136
137 try:
138 # Shutting down executor
garciadeblas8e4179f2021-05-14 16:47:03 +0200139 log.debug("Shutting down process pool executor")
palsuse57f2f12021-03-01 19:59:41 +0000140 executor.shutdown()
141 except RuntimeError as e:
garciadeblas8e4179f2021-05-14 16:47:03 +0200142 log.info("RuntimeError in shutting down executer")
143 log.debug("RuntimeError %s" % (e))
palsus9a773322021-01-20 18:26:13 +0000144 return
145
146 def collect_metrics(self) -> List[Metric]:
147 vnfrs = self.common_db.get_vnfrs()
148 metrics = []
149
150 start_time = time.time()
151 # Starting executor pool with pool size process_pool_size. Default process_pool_size is 20
garciadeblas8e4179f2021-05-14 16:47:03 +0200152 with concurrent.futures.ProcessPoolExecutor(
153 self.conf.get("collector", "process_pool_size")
154 ) as executor:
155 log.info(
156 "Started metric collector process pool with pool size %s"
157 % (self.conf.get("collector", "process_pool_size"))
158 )
palsus9a773322021-01-20 18:26:13 +0000159 futures = []
160 for vnfr in vnfrs:
garciadeblas8e4179f2021-05-14 16:47:03 +0200161 nsr_id = vnfr["nsr-id-ref"]
162 vnf_member_index = vnfr["member-vnf-index-ref"]
163 vim_account_id = self.common_db.get_vim_account_id(
164 nsr_id, vnf_member_index
165 )
166 futures.append(
167 executor.submit(
168 CollectorService._collect_vim_metrics,
169 self.conf,
170 vnfr,
171 vim_account_id,
172 )
173 )
174 futures.append(
175 executor.submit(
176 CollectorService._collect_vca_metrics, self.conf, vnfr
177 )
178 )
palsus9a773322021-01-20 18:26:13 +0000179
180 vims = self.common_db.get_vim_accounts()
181 for vim in vims:
garciadeblas8e4179f2021-05-14 16:47:03 +0200182 futures.append(
183 executor.submit(
184 CollectorService._collect_vim_infra_metrics,
185 self.conf,
186 vim["_id"],
187 )
188 )
palsus9a773322021-01-20 18:26:13 +0000189
190 sdncs = self.common_db.get_sdncs()
191 for sdnc in sdncs:
garciadeblas8e4179f2021-05-14 16:47:03 +0200192 futures.append(
193 executor.submit(
194 CollectorService._collect_sdnc_infra_metrics,
195 self.conf,
196 sdnc["_id"],
197 )
198 )
palsus9a773322021-01-20 18:26:13 +0000199
200 try:
201 # Wait for future calls to complete till process_execution_timeout. Default is 50 seconds
garciadeblas8e4179f2021-05-14 16:47:03 +0200202 for future in concurrent.futures.as_completed(
203 futures, self.conf.get("collector", "process_execution_timeout")
204 ):
205 result = future.result(
206 timeout=int(
207 self.conf.get("collector", "process_execution_timeout")
208 )
209 )
palsuse57f2f12021-03-01 19:59:41 +0000210 metrics.extend(result)
garciadeblas8e4179f2021-05-14 16:47:03 +0200211 log.debug("result = %s" % (result))
palsus9a773322021-01-20 18:26:13 +0000212 except concurrent.futures.TimeoutError as e:
213 # Some processes have not completed due to timeout error
garciadeblas8e4179f2021-05-14 16:47:03 +0200214 log.info(
215 "Some processes have not finished due to TimeoutError exception"
216 )
217 log.debug("concurrent.futures.TimeoutError exception %s" % (e))
palsus9a773322021-01-20 18:26:13 +0000218
palsuse57f2f12021-03-01 19:59:41 +0000219 # Shutting down process pool executor
220 CollectorService._stop_process_pool(executor)
palsus9a773322021-01-20 18:26:13 +0000221
222 end_time = time.time()
223 log.info("Collection completed in %s seconds", end_time - start_time)
224
225 return metrics