aa270833881f9a50e6bd357c812a59d4fedb3771
[osm/MON.git] / osm_mon / collector / service.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright 2018 Whitestack, LLC
4 # *************************************************************
5
6 # This file is part of OSM Monitoring module
7 # All Rights Reserved to Whitestack, LLC
8
9 # Licensed under the Apache License, Version 2.0 (the "License"); you may
10 # not use this file except in compliance with the License. You may obtain
11 # a copy of the License at
12
13 # http://www.apache.org/licenses/LICENSE-2.0
14
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # License for the specific language governing permissions and limitations
19 # under the License.
20 # For those usages not covered by the Apache License, Version 2.0 please
21 # contact: bdiaz@whitestack.com or glavado@whitestack.com
22 ##
23
24 # This version uses a ProcessThreadPoolExecutor to limit the number of processes launched
25
26 import logging
27 from typing import List
28 import concurrent.futures
29 import time
30
31 from osm_mon.collector.infra_collectors.onos import OnosInfraCollector
32 from osm_mon.collector.infra_collectors.openstack import OpenstackInfraCollector
33 from osm_mon.collector.infra_collectors.vio import VIOInfraCollector
34 from osm_mon.collector.infra_collectors.vmware import VMwareInfraCollector
35 from osm_mon.collector.metric import Metric
36 from osm_mon.collector.vnf_collectors.juju import VCACollector
37 from osm_mon.collector.vnf_collectors.openstack import OpenstackCollector
38 from osm_mon.collector.vnf_collectors.vio import VIOCollector
39 from osm_mon.collector.vnf_collectors.vmware import VMwareCollector
40 from osm_mon.core.common_db import CommonDbClient
41 from osm_mon.core.config import Config
42
43 log = logging.getLogger(__name__)
44
45 VIM_COLLECTORS = {
46 "openstack": OpenstackCollector,
47 "vmware": VMwareCollector,
48 "vio": VIOCollector,
49 }
50 VIM_INFRA_COLLECTORS = {
51 "openstack": OpenstackInfraCollector,
52 "vmware": VMwareInfraCollector,
53 "vio": VIOInfraCollector,
54 }
55 SDN_INFRA_COLLECTORS = {"onosof": OnosInfraCollector, "onos_vpls": OnosInfraCollector}
56
57
58 class CollectorService:
59 def __init__(self, config: Config):
60 self.conf = config
61 self.common_db = CommonDbClient(self.conf)
62 return
63
64 # static methods to be executed in the Processes
65 @staticmethod
66 def _get_vim_type(conf: Config, vim_account_id: str) -> str:
67 common_db = CommonDbClient(conf)
68 vim_account = common_db.get_vim_account(vim_account_id)
69 vim_type = vim_account["vim_type"]
70 if "config" in vim_account and "vim_type" in vim_account["config"]:
71 vim_type = vim_account["config"]["vim_type"].lower()
72 if vim_type == "vio" and "vrops_site" not in vim_account["config"]:
73 vim_type = "openstack"
74 return vim_type
75
76 @staticmethod
77 def _collect_vim_metrics(conf: Config, vnfr: dict, vim_account_id: str):
78 # TODO(diazb) Add support for aws
79 metrics = []
80 vim_type = CollectorService._get_vim_type(conf, vim_account_id)
81 log.debug("vim type.....{}".format(vim_type))
82 if vim_type in VIM_COLLECTORS:
83 collector = VIM_COLLECTORS[vim_type](conf, vim_account_id)
84 metrics = collector.collect(vnfr)
85 log.debug("Collecting vim metrics.....{}".format(metrics))
86 else:
87 log.debug("vimtype %s is not supported.", vim_type)
88 return metrics
89
90 @staticmethod
91 def _collect_vca_metrics(conf: Config, vnfr: dict):
92 metrics = []
93 vca_collector = VCACollector(conf)
94 metrics = vca_collector.collect(vnfr)
95 log.debug("Collecting vca metrics.....{}".format(metrics))
96 return metrics
97
98 @staticmethod
99 def _collect_vim_infra_metrics(conf: Config, vim_account_id: str):
100 log.info("Collecting vim infra metrics")
101 metrics = []
102 vim_type = CollectorService._get_vim_type(conf, vim_account_id)
103 if vim_type in VIM_INFRA_COLLECTORS:
104 collector = VIM_INFRA_COLLECTORS[vim_type](conf, vim_account_id)
105 metrics = collector.collect()
106 log.debug("Collecting vim infra metrics.....{}".format(metrics))
107 else:
108 log.debug("vimtype %s is not supported.", vim_type)
109 return metrics
110
111 @staticmethod
112 def _collect_sdnc_infra_metrics(conf: Config, sdnc_id: str):
113 log.info("Collecting sdnc metrics")
114 metrics = []
115 common_db = CommonDbClient(conf)
116 sdn_type = common_db.get_sdnc(sdnc_id)["type"]
117 if sdn_type in SDN_INFRA_COLLECTORS:
118 collector = SDN_INFRA_COLLECTORS[sdn_type](conf, sdnc_id)
119 metrics = collector.collect()
120 log.debug("Collecting sdnc metrics.....{}".format(metrics))
121 else:
122 log.debug("sdn_type %s is not supported.", sdn_type)
123 return metrics
124
125 @staticmethod
126 def _stop_process_pool(executor):
127 log.info("Shutting down process pool")
128 try:
129 log.debug("Stopping residual processes in the process pool")
130 for pid, process in executor._processes.items():
131 if process.is_alive():
132 process.terminate()
133 except Exception as e:
134 log.info("Exception during process termination")
135 log.debug("Exception %s" % (e))
136
137 try:
138 # Shutting down executor
139 log.debug("Shutting down process pool executor")
140 executor.shutdown()
141 except RuntimeError as e:
142 log.info("RuntimeError in shutting down executer")
143 log.debug("RuntimeError %s" % (e))
144 return
145
146 def collect_metrics(self) -> List[Metric]:
147 vnfrs = self.common_db.get_vnfrs()
148 metrics = []
149
150 start_time = time.time()
151 # Starting executor pool with pool size process_pool_size. Default process_pool_size is 20
152 with concurrent.futures.ProcessPoolExecutor(
153 self.conf.get("collector", "process_pool_size")
154 ) as executor:
155 log.info(
156 "Started metric collector process pool with pool size %s"
157 % (self.conf.get("collector", "process_pool_size"))
158 )
159 futures = []
160 for vnfr in vnfrs:
161 nsr_id = vnfr["nsr-id-ref"]
162 vnf_member_index = vnfr["member-vnf-index-ref"]
163 vim_account_id = self.common_db.get_vim_account_id(
164 nsr_id, vnf_member_index
165 )
166 futures.append(
167 executor.submit(
168 CollectorService._collect_vim_metrics,
169 self.conf,
170 vnfr,
171 vim_account_id,
172 )
173 )
174 futures.append(
175 executor.submit(
176 CollectorService._collect_vca_metrics, self.conf, vnfr
177 )
178 )
179
180 vims = self.common_db.get_vim_accounts()
181 for vim in vims:
182 futures.append(
183 executor.submit(
184 CollectorService._collect_vim_infra_metrics,
185 self.conf,
186 vim["_id"],
187 )
188 )
189
190 sdncs = self.common_db.get_sdncs()
191 for sdnc in sdncs:
192 futures.append(
193 executor.submit(
194 CollectorService._collect_sdnc_infra_metrics,
195 self.conf,
196 sdnc["_id"],
197 )
198 )
199
200 try:
201 # Wait for future calls to complete till process_execution_timeout. Default is 50 seconds
202 for future in concurrent.futures.as_completed(
203 futures, self.conf.get("collector", "process_execution_timeout")
204 ):
205 result = future.result(
206 timeout=int(
207 self.conf.get("collector", "process_execution_timeout")
208 )
209 )
210 metrics.extend(result)
211 log.debug("result = %s" % (result))
212 except concurrent.futures.TimeoutError as e:
213 # Some processes have not completed due to timeout error
214 log.info(
215 "Some processes have not finished due to TimeoutError exception"
216 )
217 log.debug("concurrent.futures.TimeoutError exception %s" % (e))
218
219 # Shutting down process pool executor
220 CollectorService._stop_process_pool(executor)
221
222 end_time = time.time()
223 log.info("Collection completed in %s seconds", end_time - start_time)
224
225 return metrics