3 from typing
import List
5 from osm_mon
.collector
.infra_collectors
.onos
import OnosInfraCollector
6 from osm_mon
.collector
.infra_collectors
.openstack
import OpenstackInfraCollector
7 from osm_mon
.collector
.infra_collectors
.vmware
import VMwareInfraCollector
8 from osm_mon
.collector
.metric
import Metric
9 from osm_mon
.collector
.utils
import CollectorUtils
10 from osm_mon
.collector
.vnf_collectors
.juju
import VCACollector
11 from osm_mon
.collector
.vnf_collectors
.openstack
import OpenstackCollector
12 from osm_mon
.collector
.vnf_collectors
.vio
import VIOCollector
13 from osm_mon
.collector
.vnf_collectors
.vmware
import VMwareCollector
14 from osm_mon
.core
.common_db
import CommonDbClient
15 from osm_mon
.core
.config
import Config
17 log
= logging
.getLogger(__name__
)
20 "openstack": OpenstackCollector
,
21 "vmware": VMwareCollector
,
24 VIM_INFRA_COLLECTORS
= {
25 "openstack": OpenstackInfraCollector
,
26 "vmware": VMwareInfraCollector
28 SDN_INFRA_COLLECTORS
= {
29 "onos": OnosInfraCollector
33 class CollectorService
:
34 def __init__(self
, config
: Config
):
36 self
.common_db
= CommonDbClient(self
.conf
)
37 self
.queue
= multiprocessing
.Queue()
39 def _collect_vim_metrics(self
, vnfr
: dict, vim_account_id
: str):
40 # TODO(diazb) Add support for aws
41 vim_type
= CollectorUtils
.get_vim_type(vim_account_id
)
42 if vim_type
in VIM_COLLECTORS
:
43 collector
= VIM_COLLECTORS
[vim_type
](self
.conf
, vim_account_id
)
44 metrics
= collector
.collect(vnfr
)
45 for metric
in metrics
:
46 self
.queue
.put(metric
)
48 log
.debug("vimtype %s is not supported.", vim_type
)
50 def _collect_vim_infra_metrics(self
, vim_account_id
: str):
51 vim_type
= CollectorUtils
.get_vim_type(vim_account_id
)
52 if vim_type
in VIM_INFRA_COLLECTORS
:
53 collector
= VIM_INFRA_COLLECTORS
[vim_type
](self
.conf
, vim_account_id
)
54 metrics
= collector
.collect()
55 for metric
in metrics
:
56 self
.queue
.put(metric
)
58 log
.debug("vimtype %s is not supported.", vim_type
)
60 def _collect_sdnc_infra_metrics(self
, sdnc_id
: str):
61 common_db
= CommonDbClient(self
.conf
)
62 sdn_type
= common_db
.get_sdnc(sdnc_id
)['type']
63 if sdn_type
in SDN_INFRA_COLLECTORS
:
64 collector
= SDN_INFRA_COLLECTORS
[sdn_type
](self
.conf
, sdnc_id
)
65 metrics
= collector
.collect()
66 for metric
in metrics
:
67 self
.queue
.put(metric
)
69 log
.debug("sdn_type %s is not supported.", sdn_type
)
71 def _collect_vca_metrics(self
, vnfr
: dict):
72 log
.debug('_collect_vca_metrics')
73 log
.debug('vnfr: %s', vnfr
)
74 vca_collector
= VCACollector(self
.conf
)
75 metrics
= vca_collector
.collect(vnfr
)
76 for metric
in metrics
:
77 self
.queue
.put(metric
)
79 def collect_metrics(self
) -> List
[Metric
]:
80 vnfrs
= self
.common_db
.get_vnfrs()
83 nsr_id
= vnfr
['nsr-id-ref']
84 vnf_member_index
= vnfr
['member-vnf-index-ref']
85 vim_account_id
= self
.common_db
.get_vim_account_id(nsr_id
, vnf_member_index
)
86 p
= multiprocessing
.Process(target
=self
._collect
_vim
_metrics
,
87 args
=(vnfr
, vim_account_id
))
90 p
= multiprocessing
.Process(target
=self
._collect
_vca
_metrics
,
94 vims
= self
.common_db
.get_vim_accounts()
96 p
= multiprocessing
.Process(target
=self
._collect
_vim
_infra
_metrics
,
100 sdncs
= self
.common_db
.get_sdncs()
102 p
= multiprocessing
.Process(target
=self
._collect
_sdnc
_infra
_metrics
,
106 for process
in processes
:
107 process
.join(timeout
=10)
109 while not self
.queue
.empty():
110 metrics
.append(self
.queue
.get())