4 # Copyright 2016 RIFT.IO Inc
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
29 import unittest
.mock
as mock
31 from rift
.tasklets
.rwmonparam
import vnfr_core
as vnf_mon_params
32 from rift
.tasklets
.rwmonparam
import nsr_core
as nsr_mon_params
36 gi
.require_version('RwDtsYang', '1.0')
37 from gi
.repository
import (
40 RwLaunchpadYang
as launchpadyang
,
47 import utest_mon_params
50 class MonParamMsgGenerator(object):
51 def __init__(self
, num_messages
=1):
52 ping_path
= r
"/api/v1/ping/stats"
53 self
._endpoint
_msg
= vnfryang
.YangData_Vnfr_VnfrCatalog_Vnfr_HttpEndpoint
.from_dict({
56 'polling_interval_secs': 1,
58 'password': 'password',
59 'headers': [{'key': 'TEST_KEY', 'value': 'TEST_VALUE'}],
62 self
._mon
_param
_msgs
= []
63 for i
in range(1, num_messages
):
64 self
._mon
_param
_msgs
.append(vnfryang
.YangData_Vnfr_VnfrCatalog_Vnfr_MonitoringParam
.from_dict({
66 'name': 'param_num_%s' % i
,
67 'json_query_method': "NAMEKEY",
68 'http_endpoint_ref': ping_path
,
71 'description': 'desc for param_num_%s' % i
,
72 'group_tag': 'Group1',
73 'widget_type': 'COUNTER',
78 self
._msgs
= iter(self
.mon_param_msgs
)
81 def mon_param_msgs(self
):
82 return self
._mon
_param
_msgs
85 def endpoint_msgs(self
):
86 return [self
._endpoint
_msg
]
88 def next_message(self
):
89 return next(self
._msgs
)
93 class MonParamsDtsTestCase(rift
.test
.dts
.AbstractDTSTest
):
95 def configure_schema(cls
):
96 return launchpadyang
.get_schema()
99 def configure_timeout(cls
):
102 def configure_test(self
, loop
, test_id
):
103 self
.log
.debug("STARTING - %s", test_id
)
104 self
.tinfo
= self
.new_tinfo(str(test_id
))
105 self
.dts
= rift
.tasklets
.DTS(self
.tinfo
, self
.schema
, self
.loop
)
107 self
.tinfo_sub
= self
.new_tinfo(str(test_id
) + "_sub")
108 self
.dts_sub
= rift
.tasklets
.DTS(self
.tinfo_sub
, self
.schema
, self
.loop
)
110 self
.msg_gen
= MonParamMsgGenerator(4)
111 self
.vnf_handler
= vnf_mon_params
.VnfMonitorDtsHandler(
112 self
.log
, self
.dts
, self
.loop
, 1, "1.1.1.1",
113 self
.msg_gen
.mon_param_msgs
, self
.msg_gen
.endpoint_msgs
116 store
= self
.setup_mock_store(aggregation_type
=None,
120 self
.nsr_handler
= nsr_mon_params
.NsrMonitorDtsHandler(
121 self
.log
, self
.dts
, self
.loop
, store
.nsr
[0], [store
.get_vnfr()], store
)
127 def setup_mock_store(self
, aggregation_type
, monps
, legacy
=False):
128 store
= mock
.MagicMock()
130 mock_vnfd
= RwVnfdYang
.YangData_Vnfd_VnfdCatalog_Vnfd
.from_dict({
132 'monitoring_param': [
133 {'description': 'no of ping requests',
134 'group_tag': 'Group1',
135 'http_endpoint_ref': 'api/v1/ping/stats',
137 'json_query_method': 'NAMEKEY',
138 'name': 'ping-request-tx-count',
141 'widget_type': 'COUNTER'},
142 {'description': 'no of ping responses',
143 'group_tag': 'Group1',
144 'http_endpoint_ref': 'api/v1/ping/stats',
146 'json_query_method': 'NAMEKEY',
147 'name': 'ping-response-rx-count',
150 'widget_type': 'COUNTER'}],
152 store
.get_vnfd
= mock
.MagicMock(return_value
=mock_vnfd
)
154 mock_vnfr
= RwVnfrYang
.YangData_Vnfr_VnfrCatalog_Vnfr
.from_dict({
156 'monitoring_param': ([monp
.as_dict() for monp
in monps
] if not legacy
else [])
158 mock_vnfr
.vnfd
= vnfryang
.YangData_Vnfr_VnfrCatalog_Vnfr_Vnfd
.from_dict({'id': '1'})
159 store
.get_vnfr
= mock
.MagicMock(return_value
=mock_vnfr
)
161 mock_nsr
= RwNsrYang
.YangData_Nsr_NsInstanceOpdata_Nsr
.from_dict({
162 'ns_instance_config_ref': "1",
164 'constituent_vnfr_ref': [{'vnfr_id': mock_vnfr
.id}],
167 store
.get_nsr
= mock
.MagicMock(return_value
=mock_nsr
)
168 store
.nsr
= [mock_nsr
]
170 monp
= [{'aggregation_type': aggregation_type
,
172 'description': 'no of ping requests',
173 'group_tag': 'Group1',
175 'widget_type': 'COUNTER',
176 'name': 'ping-request-tx-count',
178 'vnfd_monitoring_param': [
180 'vnfd_monitoring_param_ref': '1'},
182 'vnfd_monitoring_param_ref': '2'}]
185 mock_nsd
= RwNsdYang
.YangData_Nsd_NsdCatalog_Nsd
.from_dict({
186 'id': str(uuid
.uuid1()),
187 'monitoring_param': (monp
if not legacy
else [])
190 store
.get_nsd
= mock
.MagicMock(return_value
=mock_nsd
)
195 def get_published_xpaths(self
):
196 published_xpaths
= set()
198 res_iter
= yield from self
.dts
.query_read("D,/rwdts:dts")
200 res
= (yield from i
).result
201 for member
in res
.member
:
202 published_xpaths |
= {reg
.keyspec
for reg
in member
.state
.registration
if reg
.flags
== "publisher"}
204 return published_xpaths
207 def register_vnf_publisher(self
):
208 yield from self
.vnf_handler
.register()
210 def add_param_to_publisher(self
):
211 msg
= self
.msg_gen
.next_message()
212 self
.vnf_handler
.on_update_mon_params([msg
])
216 def register_vnf_test_subscriber(self
, on_prepare
=None):
217 ready_event
= asyncio
.Event(loop
=self
.loop
)
219 # Register needs to wait till reg-ready is hit, dts does not provide it
222 def on_ready(*args
, **kwargs
):
225 self
.vnf_test_subscriber
= yield from self
.dts_sub
.register(
226 self
.vnf_handler
.xpath(),
227 handler
=rift
.tasklets
.DTS
.RegistrationHandler(
228 on_ready
=on_ready
, on_prepare
=on_prepare
230 flags
=rwdts
.Flag
.SUBSCRIBER | rwdts
.Flag
.CACHE
,
233 yield from ready_event
.wait()
235 def get_ns_mon_param_msgs(self
):
236 return self
.ns_handler
.get_nsr_mon_param_msgs({'1':['1']})
238 @rift.test
.dts
.async_test
239 def _test_vnf_handler_registration(self
):
240 yield from self
.vnf_handler
.register()
241 published_xpaths
= yield from self
.get_published_xpaths()
242 assert self
.vnf_handler
.xpath() in published_xpaths
244 @rift.test
.dts
.async_test
245 def _test_add_vnf_mon_params(self
):
246 yield from self
.register_vnf_publisher()
247 self
.add_param_to_publisher()
249 yield from self
.register_vnf_test_subscriber()
250 self
.add_param_to_publisher()
252 # RIFT-12888: Elements do not go immediately into cache after on_prepare.
253 # Because of this, we can't guarantee that the second param will actually be
255 elements
= list(self
.vnf_test_subscriber
.elements
)
256 assert len(elements
) > 0
257 for element
in elements
:
258 assert element
in self
.msg_gen
.mon_param_msgs
260 @rift.test
.dts
.async_test
261 def _test_nsr_handler_registration(self
):
262 yield from self
.nsr_handler
.register()
263 published_xpaths
= yield from self
.get_published_xpaths()
264 assert self
.nsr_handler
.xpath() in published_xpaths
266 def _test_publish(self
, aggregation_type
, expected_value
, legacy
=False):
268 self
.msg_gen
= MonParamMsgGenerator(4)
269 store
= self
.setup_mock_store(aggregation_type
=aggregation_type
,
270 monps
=self
.msg_gen
.mon_param_msgs
,
273 self
.vnf_handler
= vnf_mon_params
.VnfMonitorDtsHandler(
274 self
.log
, self
.dts
, self
.loop
, 1, "1.1.1.1",
275 self
.msg_gen
.mon_param_msgs
, self
.msg_gen
.endpoint_msgs
278 self
.nsr_handler
= nsr_mon_params
.NsrMonitorDtsHandler(
279 self
.log
, self
.dts
, self
.loop
, store
.nsr
[0], [store
.get_vnfr()], store
)
282 yield from self
.nsr_handler
.register()
283 yield from self
.nsr_handler
.start()
284 published_xpaths
= yield from self
.get_published_xpaths()
286 yield from self
.register_vnf_publisher()
287 self
.add_param_to_publisher()
288 self
.add_param_to_publisher()
290 nsr_id
= store
.get_nsr().ns_instance_config_ref
292 yield from asyncio
.sleep(5, loop
=self
.loop
)
294 itr
= yield from self
.dts
.query_read(self
.nsr_handler
.xpath(),
295 rwdts
.XactFlag
.MERGE
)
300 result
= yield from res
301 nsr_monp
= result
.result
302 values
.append(nsr_monp
.value_integer
)
305 assert expected_value
in values
307 @rift.test
.dts
.async_test
308 def _test_nsr_monitor_publish_avg(self
):
309 yield from self
._test
_publish
("AVERAGE", 1)
311 @rift.test
.dts
.async_test
312 def _test_nsr_monitor_publish_sum(self
):
313 yield from self
._test
_publish
("SUM", 3)
316 @rift.test
.dts
.async_test
317 def _test_nsr_monitor_publish_max(self
):
318 yield from self
._test
_publish
("MAXIMUM", 2)
320 @rift.test
.dts
.async_test
321 def _test_nsr_monitor_publish_min(self
):
322 yield from self
._test
_publish
("MINIMUM", 1)
324 @rift.test
.dts
.async_test
325 def test_nsr_monitor_publish_count(self
):
326 yield from self
._test
_publish
("COUNT", 2)
328 @rift.test
.dts
.async_test
329 def test_legacy_nsr_monitor_publish_avg(self
):
330 yield from self
._test
_publish
("AVERAGE", 1, legacy
=True)
335 runner
= xmlrunner
.XMLTestRunner(output
=os
.environ
["RIFT_MODULE_TEST"])
337 parser
= argparse
.ArgumentParser()
338 parser
.add_argument('-v', '--verbose', action
='store_true')
339 parser
.add_argument('-n', '--no-runner', action
='store_true')
340 args
, unittest_args
= parser
.parse_known_args()
344 MonParamsDtsTestCase
.log_level
= logging
.DEBUG
if args
.verbose
else logging
.WARN
346 unittest
.main(testRunner
=runner
, argv
=[sys
.argv
[0]] + unittest_args
)
348 if __name__
== '__main__':