fb0b03937f5c1b22cb93edf292fe75eadf1ae4f8
[osm/SO.git] / rwlaunchpad / plugins / rwmonparam / test / utest_mon_params_dts.py
1 #!/usr/bin/env python3
2
3 #
4 # Copyright 2016 RIFT.IO Inc
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 #
18
19 import argparse
20 import asyncio
21 import itertools
22 import logging
23 import os
24 import sys
25 import unittest
26 import uuid
27
28 import xmlrunner
29 import unittest.mock as mock
30
31 from rift.tasklets.rwmonparam import vnfr_core as vnf_mon_params
32 from rift.tasklets.rwmonparam import nsr_core as nsr_mon_params
33 import rift.test.dts
34
35 import gi
36 gi.require_version('RwDtsYang', '1.0')
37 from gi.repository import (
38 VnfrYang as vnfryang,
39 RwNsrYang,
40 RwLaunchpadYang as launchpadyang,
41 RwDts as rwdts,
42 RwVnfrYang,
43 RwVnfdYang,
44 RwNsdYang
45 )
46
47 import utest_mon_params
48
49
50 class MonParamMsgGenerator(object):
51 def __init__(self, num_messages=1):
52 ping_path = r"/api/v1/ping/stats"
53 self._endpoint_msg = vnfryang.YangData_Vnfr_VnfrCatalog_Vnfr_HttpEndpoint.from_dict({
54 'path': ping_path,
55 'https': 'true',
56 'polling_interval_secs': 1,
57 'username': 'admin',
58 'password': 'password',
59 'headers': [{'key': 'TEST_KEY', 'value': 'TEST_VALUE'}],
60 })
61
62 self._mon_param_msgs = []
63 for i in range(1, num_messages):
64 self._mon_param_msgs.append(vnfryang.YangData_Vnfr_VnfrCatalog_Vnfr_MonitoringParam.from_dict({
65 'id': '%s' % i,
66 'name': 'param_num_%s' % i,
67 'json_query_method': "NAMEKEY",
68 'http_endpoint_ref': ping_path,
69 'value_type': "INT",
70 'value_integer': i,
71 'description': 'desc for param_num_%s' % i,
72 'group_tag': 'Group1',
73 'widget_type': 'COUNTER',
74 'units': 'packets'
75 })
76 )
77
78 self._msgs = iter(self.mon_param_msgs)
79
80 @property
81 def mon_param_msgs(self):
82 return self._mon_param_msgs
83
84 @property
85 def endpoint_msgs(self):
86 return [self._endpoint_msg]
87
88 def next_message(self):
89 return next(self._msgs)
90
91
92
93 class MonParamsDtsTestCase(rift.test.dts.AbstractDTSTest):
94 @classmethod
95 def configure_schema(cls):
96 return launchpadyang.get_schema()
97
98 @classmethod
99 def configure_timeout(cls):
100 return 240
101
102 def configure_test(self, loop, test_id):
103 self.log.debug("STARTING - %s", test_id)
104 self.tinfo = self.new_tinfo(str(test_id))
105 self.dts = rift.tasklets.DTS(self.tinfo, self.schema, self.loop)
106
107 self.tinfo_sub = self.new_tinfo(str(test_id) + "_sub")
108 self.dts_sub = rift.tasklets.DTS(self.tinfo_sub, self.schema, self.loop)
109
110 self.msg_gen = MonParamMsgGenerator(4)
111 self.vnf_handler = vnf_mon_params.VnfMonitorDtsHandler(
112 self.log, self.dts, self.loop, 1, "1.1.1.1",
113 self.msg_gen.mon_param_msgs, self.msg_gen.endpoint_msgs
114 )
115
116 store = self.setup_mock_store(aggregation_type=None,
117 monps=None,
118 legacy=True)
119
120 self.nsr_handler = nsr_mon_params.NsrMonitorDtsHandler(
121 self.log, self.dts, self.loop, store.nsr[0], [store.get_vnfr()], store)
122
123
124 def tearDown(self):
125 super().tearDown()
126
127 def setup_mock_store(self, aggregation_type, monps, legacy=False):
128 store = mock.MagicMock()
129
130 mock_vnfd = RwVnfdYang.YangData_Vnfd_VnfdCatalog_Vnfd.from_dict({
131 'id': "1",
132 'monitoring_param': [
133 {'description': 'no of ping requests',
134 'group_tag': 'Group1',
135 'http_endpoint_ref': 'api/v1/ping/stats',
136 'id': '1',
137 'json_query_method': 'NAMEKEY',
138 'name': 'ping-request-tx-count',
139 'units': 'packets',
140 'value_type': 'INT',
141 'widget_type': 'COUNTER'},
142 {'description': 'no of ping responses',
143 'group_tag': 'Group1',
144 'http_endpoint_ref': 'api/v1/ping/stats',
145 'id': '2',
146 'json_query_method': 'NAMEKEY',
147 'name': 'ping-response-rx-count',
148 'units': 'packets',
149 'value_type': 'INT',
150 'widget_type': 'COUNTER'}],
151 })
152 store.get_vnfd = mock.MagicMock(return_value=mock_vnfd)
153
154 mock_vnfr = RwVnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr.from_dict({
155 'id': '1',
156 'monitoring_param': ([monp.as_dict() for monp in monps] if not legacy else [])
157 })
158 mock_vnfr.vnfd = vnfryang.YangData_Vnfr_VnfrCatalog_Vnfr_Vnfd.from_dict({'id': '1'})
159 store.get_vnfr = mock.MagicMock(return_value=mock_vnfr)
160
161 mock_nsr = RwNsrYang.YangData_Nsr_NsInstanceOpdata_Nsr.from_dict({
162 'ns_instance_config_ref': "1",
163 'name_ref': "Foo",
164 'constituent_vnfr_ref': [{'vnfr_id': mock_vnfr.id}],
165
166 })
167 store.get_nsr = mock.MagicMock(return_value=mock_nsr)
168 store.nsr = [mock_nsr]
169
170 monp = [{'aggregation_type': aggregation_type,
171 'id': '1',
172 'description': 'no of ping requests',
173 'group_tag': 'Group1',
174 'units': 'packets',
175 'widget_type': 'COUNTER',
176 'name': 'ping-request-tx-count',
177 'value_type': 'INT',
178 'vnfd_monitoring_param': [
179 {'vnfd_id_ref': '1',
180 'vnfd_monitoring_param_ref': '1'},
181 {'vnfd_id_ref': '1',
182 'vnfd_monitoring_param_ref': '2'}]
183 }]
184
185 mock_nsd = RwNsdYang.YangData_Nsd_NsdCatalog_Nsd.from_dict({
186 'id': str(uuid.uuid1()),
187 'monitoring_param': (monp if not legacy else [])
188 })
189
190 store.get_nsd = mock.MagicMock(return_value=mock_nsd)
191
192 return store
193
194 @asyncio.coroutine
195 def get_published_xpaths(self):
196 published_xpaths = set()
197
198 res_iter = yield from self.dts.query_read("D,/rwdts:dts")
199 for i in res_iter:
200 res = (yield from i).result
201 for member in res.member:
202 published_xpaths |= {reg.keyspec for reg in member.state.registration if reg.flags == "publisher"}
203
204 return published_xpaths
205
206 @asyncio.coroutine
207 def register_vnf_publisher(self):
208 yield from self.vnf_handler.register()
209
210 def add_param_to_publisher(self):
211 msg = self.msg_gen.next_message()
212 self.vnf_handler.on_update_mon_params([msg])
213 return msg
214
215 @asyncio.coroutine
216 def register_vnf_test_subscriber(self, on_prepare=None):
217 ready_event = asyncio.Event(loop=self.loop)
218
219 # Register needs to wait till reg-ready is hit, dts does not provide it
220 # out-of-the-box.
221 @asyncio.coroutine
222 def on_ready(*args, **kwargs):
223 ready_event.set()
224
225 self.vnf_test_subscriber = yield from self.dts_sub.register(
226 self.vnf_handler.xpath(),
227 handler=rift.tasklets.DTS.RegistrationHandler(
228 on_ready=on_ready, on_prepare=on_prepare
229 ),
230 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.CACHE,
231 )
232
233 yield from ready_event.wait()
234
235 def get_ns_mon_param_msgs(self):
236 return self.ns_handler.get_nsr_mon_param_msgs({'1':['1']})
237
238 @rift.test.dts.async_test
239 def _test_vnf_handler_registration(self):
240 yield from self.vnf_handler.register()
241 published_xpaths = yield from self.get_published_xpaths()
242 assert self.vnf_handler.xpath() in published_xpaths
243
244 @rift.test.dts.async_test
245 def _test_add_vnf_mon_params(self):
246 yield from self.register_vnf_publisher()
247 self.add_param_to_publisher()
248
249 yield from self.register_vnf_test_subscriber()
250 self.add_param_to_publisher()
251
252 # RIFT-12888: Elements do not go immediately into cache after on_prepare.
253 # Because of this, we can't guarantee that the second param will actually be
254 # in the cache yet.
255 elements = list(self.vnf_test_subscriber.elements)
256 assert len(elements) > 0
257 for element in elements:
258 assert element in self.msg_gen.mon_param_msgs
259
260 @rift.test.dts.async_test
261 def _test_nsr_handler_registration(self):
262 yield from self.nsr_handler.register()
263 published_xpaths = yield from self.get_published_xpaths()
264 assert self.nsr_handler.xpath() in published_xpaths
265
266 def _test_publish(self, aggregation_type, expected_value, legacy=False):
267
268 self.msg_gen = MonParamMsgGenerator(4)
269 store = self.setup_mock_store(aggregation_type=aggregation_type,
270 monps=self.msg_gen.mon_param_msgs,
271 legacy=legacy)
272
273 self.vnf_handler = vnf_mon_params.VnfMonitorDtsHandler(
274 self.log, self.dts, self.loop, 1, "1.1.1.1",
275 self.msg_gen.mon_param_msgs, self.msg_gen.endpoint_msgs
276 )
277
278 self.nsr_handler = nsr_mon_params.NsrMonitorDtsHandler(
279 self.log, self.dts, self.loop, store.nsr[0], [store.get_vnfr()], store)
280
281 # def callback():
282 yield from self.nsr_handler.register()
283 yield from self.nsr_handler.start()
284 published_xpaths = yield from self.get_published_xpaths()
285
286 yield from self.register_vnf_publisher()
287 self.add_param_to_publisher()
288 self.add_param_to_publisher()
289
290 nsr_id = store.get_nsr().ns_instance_config_ref
291
292 yield from asyncio.sleep(5, loop=self.loop)
293
294 itr = yield from self.dts.query_read(self.nsr_handler.xpath(),
295 rwdts.XactFlag.MERGE)
296
297
298 values = []
299 for res in itr:
300 result = yield from res
301 nsr_monp = result.result
302 values.append(nsr_monp.value_integer)
303
304 print (values)
305 assert expected_value in values
306
307 @rift.test.dts.async_test
308 def _test_nsr_monitor_publish_avg(self):
309 yield from self._test_publish("AVERAGE", 1)
310
311 @rift.test.dts.async_test
312 def _test_nsr_monitor_publish_sum(self):
313 yield from self._test_publish("SUM", 3)
314
315
316 @rift.test.dts.async_test
317 def _test_nsr_monitor_publish_max(self):
318 yield from self._test_publish("MAXIMUM", 2)
319
320 @rift.test.dts.async_test
321 def _test_nsr_monitor_publish_min(self):
322 yield from self._test_publish("MINIMUM", 1)
323
324 @rift.test.dts.async_test
325 def test_nsr_monitor_publish_count(self):
326 yield from self._test_publish("COUNT", 2)
327
328 @rift.test.dts.async_test
329 def test_legacy_nsr_monitor_publish_avg(self):
330 yield from self._test_publish("AVERAGE", 1, legacy=True)
331
332
333
334 def main():
335 runner = xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])
336
337 parser = argparse.ArgumentParser()
338 parser.add_argument('-v', '--verbose', action='store_true')
339 parser.add_argument('-n', '--no-runner', action='store_true')
340 args, unittest_args = parser.parse_known_args()
341 if args.no_runner:
342 runner = None
343
344 MonParamsDtsTestCase.log_level = logging.DEBUG if args.verbose else logging.WARN
345
346 unittest.main(testRunner=runner, argv=[sys.argv[0]] + unittest_args)
347
348 if __name__ == '__main__':
349 main()