update from RIFT as of 696b75d2fe9fb046261b08c616f1bcf6c0b54a9b second try
[osm/SO.git] / rwlaunchpad / plugins / rwmonparam / test / utest_mon_params_dts.py
1 #!/usr/bin/env python3
2
3 #
4 # Copyright 2016 RIFT.IO Inc
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 #
18
19 import argparse
20 import asyncio
21 import itertools
22 import logging
23 import os
24 import sys
25 import unittest
26 import uuid
27
28 import xmlrunner
29 import unittest.mock as mock
30
31 from rift.tasklets.rwmonparam import vnfr_core as vnf_mon_params
32 from rift.tasklets.rwmonparam import nsr_core as nsr_mon_params
33 import rift.test.dts
34
35 import gi
36 gi.require_version('RwDtsYang', '1.0')
37 from gi.repository import (
38 VnfrYang as vnfryang,
39 RwNsrYang,
40 RwLaunchpadYang as launchpadyang,
41 RwDts as rwdts,
42 RwVnfrYang,
43 RwProjectVnfdYang as RwVnfdYang,
44 RwProjectNsdYang as RwNsdYang,
45 )
46
47 import utest_mon_params
48
49
50 class MonParamMsgGenerator(object):
51 def __init__(self, num_messages=1):
52 ping_path = r"/api/v1/ping/stats"
53 self._endpoint_msg = vnfryang.YangData_RwProject_Project_VnfrCatalog_Vnfr_HttpEndpoint.from_dict({
54 'path': ping_path,
55 'https': 'true',
56 'polling_interval_secs': 1,
57 'username': 'admin',
58 'password': 'password',
59 'headers': [{'key': 'TEST_KEY', 'value': 'TEST_VALUE'}],
60 })
61
62 self._mon_param_msgs = []
63 for i in range(1, num_messages):
64 self._mon_param_msgs.append(vnfryang.YangData_RwProject_Project_VnfrCatalog_Vnfr_MonitoringParam.from_dict({
65 'id': '%s' % i,
66 'name': 'param_num_%s' % i,
67 'json_query_method': "NAMEKEY",
68 'http_endpoint_ref': ping_path,
69 'value_type': "INT",
70 'value_integer': i,
71 'description': 'desc for param_num_%s' % i,
72 'group_tag': 'Group1',
73 'widget_type': 'COUNTER',
74 'units': 'packets'
75 })
76 )
77
78 self._msgs = iter(self.mon_param_msgs)
79
80 @property
81 def mon_param_msgs(self):
82 return self._mon_param_msgs
83
84 @property
85 def endpoint_msgs(self):
86 return [self._endpoint_msg]
87
88 def next_message(self):
89 return next(self._msgs)
90
91
92
93 class MonParamsDtsTestCase(rift.test.dts.AbstractDTSTest):
94 @classmethod
95 def configure_schema(cls):
96 return launchpadyang.get_schema()
97
98 @classmethod
99 def configure_timeout(cls):
100 return 480
101
102 def configure_test(self, loop, test_id):
103 self.log.debug("STARTING - %s", test_id)
104 self.tinfo = self.new_tinfo(str(test_id))
105 self.dts = rift.tasklets.DTS(self.tinfo, self.schema, self.loop)
106
107 self.tinfo_sub = self.new_tinfo(str(test_id) + "_sub")
108 self.dts_sub = rift.tasklets.DTS(self.tinfo_sub, self.schema, self.loop)
109
110 self.msg_gen = MonParamMsgGenerator(4)
111 self.vnf_handler = vnf_mon_params.VnfMonitorDtsHandler(
112 self.log, self.dts, self.loop, 1, "1.1.1.1",
113 self.msg_gen.mon_param_msgs, self.msg_gen.endpoint_msgs
114 )
115
116 store = self.setup_mock_store(aggregation_type=None,
117 monps=None,
118 legacy=True)
119
120 self.nsr_handler = nsr_mon_params.NsrMonitorDtsHandler(
121 self.log, self.dts, self.loop, store.nsr[0], [store.get_vnfr()], store)
122
123
124 def tearDown(self):
125 super().tearDown()
126
127 def setup_mock_store(self, aggregation_type, monps, legacy=False):
128 store = mock.MagicMock()
129
130 mock_vnfd = RwVnfdYang.YangData_RwProject_Project_VnfdCatalog_Vnfd.from_dict({
131 'id': "1",
132 'monitoring_param': [
133 {'description': 'no of ping requests',
134 'group_tag': 'Group1',
135 'http_endpoint_ref': 'api/v1/ping/stats',
136 'id': '1',
137 'json_query_method': 'NAMEKEY',
138 'name': 'ping-request-tx-count',
139 'units': 'packets',
140 'value_type': 'INT',
141 'widget_type': 'COUNTER'},
142 {'description': 'no of ping responses',
143 'group_tag': 'Group1',
144 'http_endpoint_ref': 'api/v1/ping/stats',
145 'id': '2',
146 'json_query_method': 'NAMEKEY',
147 'name': 'ping-response-rx-count',
148 'units': 'packets',
149 'value_type': 'INT',
150 'widget_type': 'COUNTER'}],
151 })
152 store.get_vnfd = mock.MagicMock(return_value=mock_vnfd)
153
154 mock_vnfr = RwVnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr.from_dict({
155 'id': '1',
156 'monitoring_param': ([monp.as_dict() for monp in monps] if not legacy else [])
157 })
158 mock_vnfr.vnfd = vnfryang.YangData_RwProject_Project_VnfrCatalog_Vnfr_Vnfd.from_dict({'id': '1'})
159 store.get_vnfr = mock.MagicMock(return_value=mock_vnfr)
160
161 mock_nsr = RwNsrYang.YangData_RwProject_Project_NsInstanceOpdata_Nsr.from_dict({
162 'ns_instance_config_ref': "1",
163 'name_ref': "Foo",
164 'constituent_vnfr_ref': [{'vnfr_id': mock_vnfr.id}],
165
166 })
167 store.get_nsr = mock.MagicMock(return_value=mock_nsr)
168 store.nsr = [mock_nsr]
169
170 monp = [{'aggregation_type': aggregation_type,
171 'id': '1',
172 'description': 'no of ping requests',
173 'group_tag': 'Group1',
174 'units': 'packets',
175 'widget_type': 'COUNTER',
176 'name': 'ping-request-tx-count',
177 'value_type': 'INT',
178 'vnfd_monitoring_param': [
179 {'vnfd_id_ref': '1',
180 'vnfd_monitoring_param_ref': '1'},
181 {'vnfd_id_ref': '1',
182 'vnfd_monitoring_param_ref': '2'}]
183 }]
184
185 mock_nsd = RwNsdYang.YangData_RwProject_Project_NsdCatalog_Nsd.from_dict({
186 'id': str(uuid.uuid1()),
187 'monitoring_param': (monp if not legacy else [])
188 })
189
190 store.get_nsd = mock.MagicMock(return_value=mock_nsd)
191
192 return store
193
194 @asyncio.coroutine
195 def get_published_xpaths(self):
196 published_xpaths = set()
197
198 res_iter = yield from self.dts.query_read("D,/rwdts:dts")
199 for i in res_iter:
200 res = (yield from i).result
201 for member in res.member:
202 published_xpaths |= {reg.keyspec for reg in member.state.registration if reg.flags == "publisher"}
203
204 return published_xpaths
205
206 @asyncio.coroutine
207 def register_vnf_publisher(self):
208 yield from self.vnf_handler.register()
209
210 def add_param_to_publisher(self, publisher):
211 msg = self.msg_gen.next_message()
212 publisher.on_update_mon_params([msg])
213 return msg
214
215 @asyncio.coroutine
216 def register_vnf_test_subscriber(self, on_prepare=None):
217 ready_event = asyncio.Event(loop=self.loop)
218
219 # Register needs to wait till reg-ready is hit, dts does not provide it
220 # out-of-the-box.
221 @asyncio.coroutine
222 def on_ready(*args, **kwargs):
223 ready_event.set()
224
225 self.vnf_test_subscriber = yield from self.dts_sub.register(
226 self.vnf_handler.xpath(),
227 handler=rift.tasklets.DTS.RegistrationHandler(
228 on_ready=on_ready, on_prepare=on_prepare
229 ),
230 flags=rwdts.Flag.SUBSCRIBER | rwdts.Flag.CACHE,
231 )
232
233 yield from ready_event.wait()
234
235 def get_ns_mon_param_msgs(self):
236 return self.ns_handler.get_nsr_mon_param_msgs({'1':['1']})
237
238 @rift.test.dts.async_test
239 def _test_vnf_handler_registration(self):
240 yield from self.vnf_handler.register()
241 published_xpaths = yield from self.get_published_xpaths()
242 assert self.vnf_handler.xpath() in published_xpaths
243
244 @rift.test.dts.async_test
245 def _test_add_vnf_mon_params(self):
246 yield from self.register_vnf_publisher()
247 self.add_param_to_publisher(self.vnf_handler)
248
249 yield from self.register_vnf_test_subscriber()
250 self.add_param_to_publisher(self.vnf_handler)
251
252 # RIFT-12888: Elements do not go immediately into cache after on_prepare.
253 # Because of this, we can't guarantee that the second param will actually be
254 # in the cache yet.
255 elements = list(self.vnf_test_subscriber.elements)
256 assert len(elements) > 0
257 for element in elements:
258 assert element in self.msg_gen.mon_param_msgs
259
260 @rift.test.dts.async_test
261 def _test_nsr_handler_registration(self):
262 yield from self.nsr_handler.register()
263 published_xpaths = yield from self.get_published_xpaths()
264 assert self.nsr_handler.xpath() in published_xpaths
265
266 def _test_publish(self, aggregation_type, expected_value, legacy=False):
267
268 self.msg_gen = MonParamMsgGenerator(5)
269 store = self.setup_mock_store(aggregation_type=aggregation_type,
270 monps=self.msg_gen.mon_param_msgs,
271 legacy=legacy)
272
273 self.vnf_handler = vnf_mon_params.VnfMonitorDtsHandler(
274 self.log, self.dts, self.loop, 1, "1.1.1.1",
275 self.msg_gen.mon_param_msgs, self.msg_gen.endpoint_msgs
276 )
277
278 self.nsr_handler = nsr_mon_params.NsrMonitorDtsHandler(
279 self.log, self.dts, self.loop, store.nsr[0], [store.get_vnfr()], store)
280
281 # def callback():
282 yield from self.nsr_handler.register()
283 yield from self.nsr_handler.start()
284 published_xpaths = yield from self.get_published_xpaths()
285
286 yield from self.register_vnf_publisher()
287 self.add_param_to_publisher(self.vnf_handler)
288 self.add_param_to_publisher(self.vnf_handler)
289
290 nsr_id = store.get_nsr().ns_instance_config_ref
291
292 yield from asyncio.sleep(2, loop=self.loop)
293
294 itr = yield from self.dts.query_read(self.nsr_handler.xpath(),
295 rwdts.XactFlag.MERGE)
296
297
298 values = []
299 for res in itr:
300 result = yield from res
301 nsr_monp = result.result
302 values.append(nsr_monp.value_integer)
303
304 print (values)
305 assert expected_value in values
306
307 @rift.test.dts.async_test
308 def _test_nsr_monitor_publish_avg(self):
309 yield from self._test_publish("AVERAGE", 1)
310
311 @rift.test.dts.async_test
312 def _test_nsr_monitor_publish_sum(self):
313 yield from self._test_publish("SUM", 3)
314
315
316 @rift.test.dts.async_test
317 def _test_nsr_monitor_publish_max(self):
318 yield from self._test_publish("MAXIMUM", 2)
319
320 @rift.test.dts.async_test
321 def _test_nsr_monitor_publish_min(self):
322 yield from self._test_publish("MINIMUM", 1)
323
324 @rift.test.dts.async_test
325 def test_nsr_monitor_publish_count(self):
326 yield from self._test_publish("COUNT", 2)
327
328 @rift.test.dts.async_test
329 def test_legacy_nsr_monitor_publish_avg(self):
330 yield from self._test_publish("AVERAGE", 1, legacy=True)
331
332 @rift.test.dts.async_test
333 def test_vnfr_add_delete(self):
334 yield from self._test_publish("SUM", 3)
335
336 self.msg_gen = MonParamMsgGenerator(5)
337 store = self.setup_mock_store(aggregation_type="SUM",
338 monps=self.msg_gen.mon_param_msgs)
339 new_vnf_handler = vnf_mon_params.VnfMonitorDtsHandler(
340 self.log, self.dts, self.loop, 2, "2.2.2.1",
341 self.msg_gen.mon_param_msgs, self.msg_gen.endpoint_msgs
342 )
343 yield from new_vnf_handler.register()
344
345 # add a new vnfr
346 new_vnfr = store.get_vnfr()
347 new_vnfr.id = '2'
348 yield from self.nsr_handler.update([new_vnfr])
349
350 # check if the newly created one has been added in the model
351 poller = self.nsr_handler.mon_params_pollers[0]
352 assert len(poller.monp.nsr_mon_param_msg.vnfr_mon_param_ref) == 4
353 assert len(poller.subscribers) == 4
354 assert len(poller.monp.vnfr_monparams) == 4
355
356 # publish new values
357 yield from asyncio.sleep(2, loop=self.loop)
358 self.add_param_to_publisher(new_vnf_handler)
359 self.add_param_to_publisher(new_vnf_handler)
360 yield from asyncio.sleep(3, loop=self.loop)
361
362 itr = yield from self.dts.query_read(self.nsr_handler.xpath(),
363 rwdts.XactFlag.MERGE)
364
365 values = []
366 for res in itr:
367 result = yield from res
368 nsr_monp = result.result
369 values.append(nsr_monp.value_integer)
370
371 assert values[0] == 6
372
373 # delete the VNFR
374 yield from self.nsr_handler.delete([new_vnfr])
375
376 # check if the newly created one has been added in the model
377 poller = self.nsr_handler.mon_params_pollers[0]
378 assert len(poller.monp.vnfr_monparams) == 2
379 assert len(poller.monp.nsr_mon_param_msg.vnfr_mon_param_ref) == 2
380 assert len(poller.subscribers) == 2
381
382 self.msg_gen = MonParamMsgGenerator(5)
383 self.add_param_to_publisher(self.vnf_handler)
384 self.add_param_to_publisher(self.vnf_handler)
385 yield from asyncio.sleep(2, loop=self.loop)
386
387 itr = yield from self.dts.query_read(self.nsr_handler.xpath(),
388 rwdts.XactFlag.MERGE)
389 values = []
390 for res in itr:
391 result = yield from res
392 nsr_monp = result.result
393 values.append(nsr_monp.value_integer)
394
395 assert values[0] == 3
396
397
398
399 def main():
400 runner = xmlrunner.XMLTestRunner(output=os.environ["RIFT_MODULE_TEST"])
401
402 parser = argparse.ArgumentParser()
403 parser.add_argument('-v', '--verbose', action='store_true')
404 parser.add_argument('-n', '--no-runner', action='store_true')
405 args, unittest_args = parser.parse_known_args()
406 if args.no_runner:
407 runner = None
408
409 MonParamsDtsTestCase.log_level = logging.DEBUG if args.verbose else logging.WARN
410
411 unittest.main(testRunner=runner, argv=[sys.argv[0]] + unittest_args)
412
413 if __name__ == '__main__':
414 main()