X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_mon%2Fcollector%2Fservice.py;h=005f844e3ffdbb428dc10c06c28cea89db816534;hb=02f4d10c2f14cf08310853c4744dc96aca3d1ceb;hp=98f3f16c1c41dc19be735e7ffef113eb1344bc03;hpb=bf4f4968106ced34a9f82edf41bc868a5268cb84;p=osm%2FMON.git diff --git a/osm_mon/collector/service.py b/osm_mon/collector/service.py index 98f3f16..005f844 100644 --- a/osm_mon/collector/service.py +++ b/osm_mon/collector/service.py @@ -1,13 +1,39 @@ +# -*- coding: utf-8 -*- + +# Copyright 2018 Whitestack, LLC +# ************************************************************* + +# This file is part of OSM Monitoring module +# All Rights Reserved to Whitestack, LLC + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# For those usages not covered by the Apache License, Version 2.0 please +# contact: bdiaz@whitestack.com or glavado@whitestack.com +## + +# This version uses a ProcessThreadPoolExecutor to limit the number of processes launched + import logging -import multiprocessing from typing import List +import concurrent.futures +import time +import keystoneauth1.exceptions from osm_mon.collector.infra_collectors.onos import OnosInfraCollector from osm_mon.collector.infra_collectors.openstack import OpenstackInfraCollector -from osm_mon.collector.infra_collectors.vmware import VMwareInfraCollector from osm_mon.collector.infra_collectors.vio import VIOInfraCollector +from osm_mon.collector.infra_collectors.vmware import VMwareInfraCollector from osm_mon.collector.metric import Metric -from osm_mon.collector.utils import CollectorUtils from osm_mon.collector.vnf_collectors.juju import VCACollector from osm_mon.collector.vnf_collectors.openstack import OpenstackCollector from osm_mon.collector.vnf_collectors.vio import VIOCollector @@ -20,94 +46,185 @@ log = logging.getLogger(__name__) VIM_COLLECTORS = { "openstack": OpenstackCollector, "vmware": VMwareCollector, - "vio": VIOCollector + "vio": VIOCollector, } VIM_INFRA_COLLECTORS = { "openstack": OpenstackInfraCollector, "vmware": VMwareInfraCollector, - "vio": VIOInfraCollector -} -SDN_INFRA_COLLECTORS = { - "onos": OnosInfraCollector + "vio": VIOInfraCollector, } +SDN_INFRA_COLLECTORS = {"onosof": OnosInfraCollector, "onos_vpls": OnosInfraCollector} class CollectorService: def __init__(self, config: Config): self.conf = config self.common_db = CommonDbClient(self.conf) - self.queue = multiprocessing.Queue() + return - def _collect_vim_metrics(self, vnfr: dict, vim_account_id: str): + # static methods to be executed in the Processes + @staticmethod + def _get_vim_type(conf: Config, vim_account_id: str) -> str: + common_db = CommonDbClient(conf) + vim_account = common_db.get_vim_account(vim_account_id) + vim_type = vim_account["vim_type"] + if "config" in vim_account and "vim_type" in vim_account["config"]: + vim_type = vim_account["config"]["vim_type"].lower() + if vim_type == "vio" and "vrops_site" not in vim_account["config"]: + vim_type = "openstack" + return vim_type + + @staticmethod + def _collect_vim_metrics(conf: Config, vnfr: dict, vim_account_id: str): # TODO(diazb) Add support for aws - vim_type = CollectorUtils.get_vim_type(vim_account_id) + metrics = [] + vim_type = CollectorService._get_vim_type(conf, vim_account_id) + log.debug("vim type.....{}".format(vim_type)) if vim_type in VIM_COLLECTORS: - collector = VIM_COLLECTORS[vim_type](self.conf, vim_account_id) + collector = VIM_COLLECTORS[vim_type](conf, vim_account_id) metrics = collector.collect(vnfr) - for metric in metrics: - self.queue.put(metric) + log.debug("Collecting vim metrics.....{}".format(metrics)) else: log.debug("vimtype %s is not supported.", vim_type) + return metrics + + @staticmethod + def _collect_vca_metrics(conf: Config, vnfr: dict): + metrics = [] + vca_collector = VCACollector(conf) + metrics = vca_collector.collect(vnfr) + log.debug("Collecting vca metrics.....{}".format(metrics)) + return metrics - def _collect_vim_infra_metrics(self, vim_account_id: str): - vim_type = CollectorUtils.get_vim_type(vim_account_id) + @staticmethod + def _collect_vim_infra_metrics(conf: Config, vim_account_id: str): + log.info("Collecting vim infra metrics") + metrics = [] + vim_type = CollectorService._get_vim_type(conf, vim_account_id) if vim_type in VIM_INFRA_COLLECTORS: - collector = VIM_INFRA_COLLECTORS[vim_type](self.conf, vim_account_id) + collector = VIM_INFRA_COLLECTORS[vim_type](conf, vim_account_id) metrics = collector.collect() - for metric in metrics: - self.queue.put(metric) + log.debug("Collecting vim infra metrics.....{}".format(metrics)) else: log.debug("vimtype %s is not supported.", vim_type) + return metrics - def _collect_sdnc_infra_metrics(self, sdnc_id: str): - common_db = CommonDbClient(self.conf) - sdn_type = common_db.get_sdnc(sdnc_id)['type'] + @staticmethod + def _collect_sdnc_infra_metrics(conf: Config, sdnc_id: str): + log.info("Collecting sdnc metrics") + metrics = [] + common_db = CommonDbClient(conf) + sdn_type = common_db.get_sdnc(sdnc_id)["type"] if sdn_type in SDN_INFRA_COLLECTORS: - collector = SDN_INFRA_COLLECTORS[sdn_type](self.conf, sdnc_id) + collector = SDN_INFRA_COLLECTORS[sdn_type](conf, sdnc_id) metrics = collector.collect() - for metric in metrics: - self.queue.put(metric) + log.debug("Collecting sdnc metrics.....{}".format(metrics)) else: log.debug("sdn_type %s is not supported.", sdn_type) + return metrics - def _collect_vca_metrics(self, vnfr: dict): - log.debug('_collect_vca_metrics') - log.debug('vnfr: %s', vnfr) - vca_collector = VCACollector(self.conf) - metrics = vca_collector.collect(vnfr) - for metric in metrics: - self.queue.put(metric) + @staticmethod + def _stop_process_pool(executor): + log.info("Shutting down process pool") + try: + log.debug("Stopping residual processes in the process pool") + for pid, process in executor._processes.items(): + if process.is_alive(): + process.terminate() + except Exception as e: + log.info("Exception during process termination") + log.debug("Exception %s" % (e)) + + try: + # Shutting down executor + log.debug("Shutting down process pool executor") + executor.shutdown() + except RuntimeError as e: + log.info("RuntimeError in shutting down executer") + log.debug("RuntimeError %s" % (e)) + return def collect_metrics(self) -> List[Metric]: vnfrs = self.common_db.get_vnfrs() - processes = [] - for vnfr in vnfrs: - nsr_id = vnfr['nsr-id-ref'] - vnf_member_index = vnfr['member-vnf-index-ref'] - vim_account_id = self.common_db.get_vim_account_id(nsr_id, vnf_member_index) - p = multiprocessing.Process(target=self._collect_vim_metrics, - args=(vnfr, vim_account_id)) - processes.append(p) - p.start() - p = multiprocessing.Process(target=self._collect_vca_metrics, - args=(vnfr,)) - processes.append(p) - p.start() - vims = self.common_db.get_vim_accounts() - for vim in vims: - p = multiprocessing.Process(target=self._collect_vim_infra_metrics, - args=(vim['_id'],)) - processes.append(p) - p.start() - sdncs = self.common_db.get_sdncs() - for sdnc in sdncs: - p = multiprocessing.Process(target=self._collect_sdnc_infra_metrics, - args=(sdnc['_id'],)) - processes.append(p) - p.start() - for process in processes: - process.join(timeout=10) metrics = [] - while not self.queue.empty(): - metrics.append(self.queue.get()) + + start_time = time.time() + # Starting executor pool with pool size process_pool_size. Default process_pool_size is 20 + with concurrent.futures.ProcessPoolExecutor( + self.conf.get("collector", "process_pool_size") + ) as executor: + log.info( + "Started metric collector process pool with pool size %s" + % (self.conf.get("collector", "process_pool_size")) + ) + futures = [] + for vnfr in vnfrs: + nsr_id = vnfr["nsr-id-ref"] + vnf_member_index = vnfr["member-vnf-index-ref"] + vim_account_id = self.common_db.get_vim_account_id( + nsr_id, vnf_member_index + ) + futures.append( + executor.submit( + CollectorService._collect_vim_metrics, + self.conf, + vnfr, + vim_account_id, + ) + ) + futures.append( + executor.submit( + CollectorService._collect_vca_metrics, self.conf, vnfr + ) + ) + + vims = self.common_db.get_vim_accounts() + for vim in vims: + futures.append( + executor.submit( + CollectorService._collect_vim_infra_metrics, + self.conf, + vim["_id"], + ) + ) + + sdncs = self.common_db.get_sdncs() + for sdnc in sdncs: + futures.append( + executor.submit( + CollectorService._collect_sdnc_infra_metrics, + self.conf, + sdnc["_id"], + ) + ) + + try: + # Wait for future calls to complete till process_execution_timeout. Default is 50 seconds + for future in concurrent.futures.as_completed( + futures, self.conf.get("collector", "process_execution_timeout") + ): + try: + result = future.result( + timeout=int( + self.conf.get("collector", "process_execution_timeout") + ) + ) + metrics.extend(result) + log.debug("result = %s" % (result)) + except keystoneauth1.exceptions.connection.ConnectTimeout as e: + log.info("Keystone connection timeout during metric collection") + log.debug("Keystone connection timeout exception %s" % (e)) + except concurrent.futures.TimeoutError as e: + # Some processes have not completed due to timeout error + log.info( + "Some processes have not finished due to TimeoutError exception" + ) + log.debug("concurrent.futures.TimeoutError exception %s" % (e)) + + # Shutting down process pool executor + CollectorService._stop_process_pool(executor) + + end_time = time.time() + log.info("Collection completed in %s seconds", end_time - start_time) + return metrics