X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_lcm%2Ftests%2Ftest_ns.py;h=20047016645ceef321716447d82db0e6f851a823;hb=refs%2Fchanges%2F23%2F12623%2F1;hp=7613a88eeb369992b1e72d04c59e39a70af1abfb;hpb=ca7ece02d870a16bdcd47dab7c5ac30c82c58545;p=osm%2FLCM.git diff --git a/osm_lcm/tests/test_ns.py b/osm_lcm/tests/test_ns.py index 7613a88..2004701 100644 --- a/osm_lcm/tests/test_ns.py +++ b/osm_lcm/tests/test_ns.py @@ -122,19 +122,15 @@ class TestMyNS(asynctest.TestCase): print("kwargs > {}".format(kwargs)) if args: if "update" in args: - ro_ns_desc = yaml.load( - descriptors.ro_update_action_text, Loader=yaml.Loader - ) + ro_ns_desc = yaml.safe_load(descriptors.ro_update_action_text) while True: yield ro_ns_desc if kwargs.get("delete"): - ro_ns_desc = yaml.load( - descriptors.ro_delete_action_text, Loader=yaml.Loader - ) + ro_ns_desc = yaml.safe_load(descriptors.ro_delete_action_text) while True: yield ro_ns_desc - ro_ns_desc = yaml.load(descriptors.ro_ns_text, Loader=yaml.Loader) + ro_ns_desc = yaml.safe_load(descriptors.ro_ns_text) # if ip address provided, replace descriptor ip_addresses = getenv("OSMLCMTEST_NS_IPADDRESS", "") @@ -176,36 +172,26 @@ class TestMyNS(asynctest.TestCase): Database.instance = None self.db = Database({"database": {"driver": "memory"}}).instance.db - self.db.create_list( - "vnfds", yaml.load(descriptors.db_vnfds_text, Loader=yaml.Loader) - ) + self.db.create_list("vnfds", yaml.safe_load(descriptors.db_vnfds_text)) self.db.create_list( "vnfds_revisions", - yaml.load(descriptors.db_vnfds_revisions_text, Loader=yaml.Loader), - ) - self.db.create_list( - "nsds", yaml.load(descriptors.db_nsds_text, Loader=yaml.Loader) - ) - self.db.create_list( - "nsrs", yaml.load(descriptors.db_nsrs_text, Loader=yaml.Loader) + yaml.safe_load(descriptors.db_vnfds_revisions_text), ) + self.db.create_list("nsds", yaml.safe_load(descriptors.db_nsds_text)) + self.db.create_list("nsrs", yaml.safe_load(descriptors.db_nsrs_text)) self.db.create_list( "vim_accounts", - yaml.load(descriptors.db_vim_accounts_text, Loader=yaml.Loader), + yaml.safe_load(descriptors.db_vim_accounts_text), ) self.db.create_list( "k8sclusters", - yaml.load(descriptors.db_k8sclusters_text, Loader=yaml.Loader), + yaml.safe_load(descriptors.db_k8sclusters_text), ) self.db.create_list( - "nslcmops", yaml.load(descriptors.db_nslcmops_text, Loader=yaml.Loader) - ) - self.db.create_list( - "vnfrs", yaml.load(descriptors.db_vnfrs_text, Loader=yaml.Loader) - ) - self.db_vim_accounts = yaml.load( - descriptors.db_vim_accounts_text, Loader=yaml.Loader + "nslcmops", yaml.safe_load(descriptors.db_nslcmops_text) ) + self.db.create_list("vnfrs", yaml.safe_load(descriptors.db_vnfrs_text)) + self.db_vim_accounts = yaml.safe_load(descriptors.db_vim_accounts_text) # Mock kafka self.msg = asynctest.Mock(MsgKafka()) @@ -390,6 +376,38 @@ class TestMyNS(asynctest.TestCase): # await self.test_instantiate() # # this will check that the initial-congig-primitive 'not_to_be_called' is not called + @asynctest.fail_on(active_handles=True) + async def test_start_stop_rebuild_pass(self): + nsr_id = descriptors.test_ids["TEST-OP-VNF"]["ns"] + nslcmop_id = descriptors.test_ids["TEST-OP-VNF"]["nslcmops"] + vnf_id = descriptors.test_ids["TEST-OP-VNF"]["vnfrs"] + additional_param = {"count-index": "0"} + operation_type = "start" + await self.my_ns.rebuild_start_stop( + nsr_id, nslcmop_id, vnf_id, additional_param, operation_type + ) + expected_value = "COMPLETED" + return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get( + "operationState" + ) + self.assertEqual(return_value, expected_value) + + @asynctest.fail_on(active_handles=True) + async def test_start_stop_rebuild_fail(self): + nsr_id = descriptors.test_ids["TEST-OP-VNF"]["ns"] + nslcmop_id = descriptors.test_ids["TEST-OP-VNF"]["nslcmops1"] + vnf_id = descriptors.test_ids["TEST-OP-VNF"]["vnfrs"] + additional_param = {"count-index": "0"} + operation_type = "stop" + await self.my_ns.rebuild_start_stop( + nsr_id, nslcmop_id, vnf_id, additional_param, operation_type + ) + expected_value = "Error" + return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get( + "operationState" + ) + self.assertEqual(return_value, expected_value) + # Test scale() and related methods @asynctest.fail_on(active_handles=True) # all async tasks must be completed async def test_scale(self): @@ -797,16 +815,53 @@ class TestMyNS(asynctest.TestCase): nsr_id = descriptors.test_ids["TEST-UPDATE"]["ns"] nslcmop_id = descriptors.test_ids["TEST-UPDATE"]["removeVnf"] vnf_instance_id = descriptors.test_ids["TEST-UPDATE"]["vnf"] - self.my_ns.RO.status = asynctest.CoroutineMock(self.my_ns.RO.status, side_effect=self._ro_status("update")) - await self.my_ns.update(nsr_id, nslcmop_id) - expected_value = "COMPLETED" + mock_wait_ng_ro = asynctest.CoroutineMock() + with patch("osm_lcm.ns.NsLcm._wait_ng_ro", mock_wait_ng_ro): + await self.my_ns.update(nsr_id, nslcmop_id) + expected_value = "COMPLETED" + return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get( + "operationState" + ) + self.assertEqual(return_value, expected_value) + with self.assertRaises(Exception) as context: + self.db.get_one("vnfrs", {"_id": vnf_instance_id}) + self.assertTrue( + "database exception Not found entry with filter" + in str(context.exception) + ) + + # test vertical scale executes sucessfully + # @patch("osm_lcm.ng_ro.status.response") + @asynctest.fail_on(active_handles=True) + async def test_vertical_scaling(self): + nsr_id = descriptors.test_ids["TEST-V-SCALE"]["ns"] + nslcmop_id = descriptors.test_ids["TEST-V-SCALE"]["instantiate"] + + # calling the vertical scale fucntion + # self.my_ns.RO.status = asynctest.CoroutineMock(self.my_ns.RO.status, side_effect=self._ro_status("update")) + mock_wait_ng_ro = asynctest.CoroutineMock() + with patch("osm_lcm.ns.NsLcm._wait_ng_ro", mock_wait_ng_ro): + await self.my_ns.vertical_scale(nsr_id, nslcmop_id) + return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get( + "operationState" + ) + expected_value = "COMPLETED" + self.assertEqual(return_value, expected_value) + + # test vertical scale executes fail + @asynctest.fail_on(active_handles=True) + async def test_vertical_scaling_fail(self): + # get th nsr nad nslcmops id from descriptors + nsr_id = descriptors.test_ids["TEST-V-SCALE"]["ns"] + nslcmop_id = descriptors.test_ids["TEST-V-SCALE"]["instantiate-1"] + + # calling the vertical scale fucntion + await self.my_ns.vertical_scale(nsr_id, nslcmop_id) return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get( "operationState" ) + expected_value = "FAILED" self.assertEqual(return_value, expected_value) - with self.assertRaises(Exception) as context: - self.db.get_one("vnfrs", {"_id": vnf_instance_id}) - self.assertTrue("database exception Not found entry with filter" in str(context.exception)) # async def test_instantiate_pdu(self): # nsr_id = descriptors.test_ids["TEST-A"]["ns"] @@ -981,7 +1036,7 @@ class TestMyNS(asynctest.TestCase): mock_charm_hash.assert_called_with( "7637bcf8-cf14-42dc-ad70-c66fcf1e6e77:1/hackfest_3charmed_vnfd/charms/simple", - "7637bcf8-cf14-42dc-ad70-c66fcf1e6e77/hackfest_3charmed_vnfd/charms/simple", + "7637bcf8-cf14-42dc-ad70-c66fcf1e6e77:3/hackfest_3charmed_vnfd/charms/simple", ) self.assertEqual(fs.sync.call_count, 2) @@ -1151,7 +1206,7 @@ class TestMyNS(asynctest.TestCase): mock_charm_hash.assert_called_with( "7637bcf8-cf14-42dc-ad70-c66fcf1e6e77:1/hackfest_3charmed_vnfd/charms/simple", - "7637bcf8-cf14-42dc-ad70-c66fcf1e6e77/hackfest_3charmed_vnfd/charms/simple", + "7637bcf8-cf14-42dc-ad70-c66fcf1e6e77:3/hackfest_3charmed_vnfd/charms/simple", ) self.assertEqual(fs.sync.call_count, 2) @@ -1491,7 +1546,7 @@ class TestMyNS(asynctest.TestCase): result = find_software_version(db_vnfd) self.assertEqual(result, expected_result, "VNFD software version is wrong") - with self.subTest(i=3, t="Check charm hash, Hash has did not change"): + with self.subTest(i=3, t="Check charm hash, Hash did not change"): # Testing method check_charm_hash_changed current_path, target_path = "/tmp/charm1", "/tmp/charm1"