X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=rwlaunchpad%2Fplugins%2Frwmonparam%2Ftest%2Futest_mon_params.py;h=4836bf4f995a0d65cdb8ee3506f4407e435fb1e2;hb=f314b4af9744068a7ed7a6a6314220c3aa857523;hp=a0817d7d68a5f21e3052bf88c7b72c8034022826;hpb=255ff03a528a3090ce7f46f0a63b65da3e6f9bcf;p=osm%2FSO.git diff --git a/rwlaunchpad/plugins/rwmonparam/test/utest_mon_params.py b/rwlaunchpad/plugins/rwmonparam/test/utest_mon_params.py index a0817d7d..4836bf4f 100755 --- a/rwlaunchpad/plugins/rwmonparam/test/utest_mon_params.py +++ b/rwlaunchpad/plugins/rwmonparam/test/utest_mon_params.py @@ -28,6 +28,7 @@ import tornado.testing import tornado.web import unittest import xmlrunner +import xmltodict, json import rift.tasklets.rwmonparam.vnfr_core as mon_params @@ -53,7 +54,7 @@ class MonParamsPingStatsTest(AsyncioTornadoTest): 'ping-response-rx-count': 10 } - mon_param_msg = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_MonitoringParam() + mon_param_msg = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_MonitoringParam() mon_param_msg.from_dict({ 'id': '1', 'name': 'ping-request-tx-count', @@ -66,7 +67,7 @@ class MonParamsPingStatsTest(AsyncioTornadoTest): 'units': 'packets' }) - endpoint_msg = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_HttpEndpoint() + endpoint_msg = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_HttpEndpoint() endpoint_msg.from_dict({ 'path': ping_path, 'polling_interval_secs': 1, @@ -230,7 +231,7 @@ class MonParamsPingStatsHttpsTest(AsyncioTornadoHttpsTest): 'ping-response-rx-count': 10 } - mon_param_msg = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_MonitoringParam() + mon_param_msg = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_MonitoringParam() mon_param_msg.from_dict({ 'id': '1', 'name': 'ping-request-tx-count', @@ -243,7 +244,7 @@ class MonParamsPingStatsHttpsTest(AsyncioTornadoHttpsTest): 'units': 'packets' }) - endpoint_msg = VnfrYang.YangData_Vnfr_VnfrCatalog_Vnfr_HttpEndpoint() + endpoint_msg = VnfrYang.YangData_RwProject_Project_VnfrCatalog_Vnfr_HttpEndpoint() endpoint_msg.from_dict({ 'path': ping_path, 'https': 'true', @@ -918,6 +919,51 @@ class PortLatencyTest(unittest.TestCase): self.assertEqual(value, 12112) +class vCPEStatTest(unittest.TestCase): + system_response = {"timestamp":1473455051, + "applicationLoad":[ +{ "service":"RIF", "instance":1, "gtpMessagesPerSec":0}, +{"service":"CPE", "instance":1, "tps":0}, +{"service":"DPE", "instance":1, "uplinkThroughput4G":0, "downlinkThroughput4G":0, "numDefaultBearers":0, "numDedicatedBearers":0 }, +{"service":"VEM", "instance":1 }, +{"service":"CDF", "instance":1, "tps":0}, +{"service":"S6A", "instance":1, "tps":0}, +{"service":"SDB", "instance":1, "queriesPerSec":0 }], + "resourceLoad":[ +{ "service":"RIF", "instance":1, "cpu":0, "mem":18, "compCpu":0 }, +{ "service":"CPE", "instance":1, "cpu":0, "mem":26, "compCpu":0 }, +{ "service":"DPE", "instance":1, "cpu":0, "mem":31, "compCpu":0 }, +{ "service":"VEM", "instance":1, "cpu":1, "mem":34, "compCpu":0 }, +{ "service":"CDF", "instance":1, "cpu":0, "mem":18, "compCpu":0 }, +{ "service":"S6A", "instance":1, "cpu":1, "mem":21, "compCpu":0 }, +{ "service":"SDB", "instance":1, "memUsedByData":255543560, "swapUtil":0, "swapTotal":3689934848, "swapUsed":0,"memUtil":0, "memTotal":12490944512, "memFree":10986942464, "cpu":2}] } + + + def test_object_path_value_querier(self): + kv_querier = mon_params.ObjectPathValueQuerier(logger, "$.applicationLoad[@.service is 'DPE'].uplinkThroughput4G") + value = kv_querier.query(tornado.escape.json_encode(self.system_response)) + self.assertEqual(value, 0) + kv_querier = mon_params.ObjectPathValueQuerier(logger, "$.resourceLoad[@.service is 'DPE'].mem") + value = kv_querier.query(tornado.escape.json_encode(self.system_response)) + self.assertEqual(value, 31) + + +class XMLReponseTest(unittest.TestCase): + xml_response = " 2 1 0 0 " + try: + op = xmltodict.parse(xml_response) + x=json.dumps(op) + y=json.loads(x) + system_response = y + except Exception as e: + print("Input is Not XML formatted") + pass + + def test_object_path_value_querier(self): + kv_querier = mon_params.ObjectPathValueQuerier(logger, "$.response.result.entry.current") + value = kv_querier.query(tornado.escape.json_encode(self.system_response)) + self.assertEqual(value, '2') + def main(argv=sys.argv[1:]): # The unittest framework requires a program name, so use the name of this