X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_lcm%2Ftests%2Ftest_ns.py;h=20047016645ceef321716447d82db0e6f851a823;hb=refs%2Fchanges%2F23%2F12623%2F1;hp=466976c60c0c846188e97a890210112f624cbfb9;hpb=b827de931cbe2452bcca76b8b24371b4a8afe2aa;p=osm%2FLCM.git diff --git a/osm_lcm/tests/test_ns.py b/osm_lcm/tests/test_ns.py index 466976c..2004701 100644 --- a/osm_lcm/tests/test_ns.py +++ b/osm_lcm/tests/test_ns.py @@ -122,19 +122,15 @@ class TestMyNS(asynctest.TestCase): print("kwargs > {}".format(kwargs)) if args: if "update" in args: - ro_ns_desc = yaml.load( - descriptors.ro_update_action_text, Loader=yaml.Loader - ) + ro_ns_desc = yaml.safe_load(descriptors.ro_update_action_text) while True: yield ro_ns_desc if kwargs.get("delete"): - ro_ns_desc = yaml.load( - descriptors.ro_delete_action_text, Loader=yaml.Loader - ) + ro_ns_desc = yaml.safe_load(descriptors.ro_delete_action_text) while True: yield ro_ns_desc - ro_ns_desc = yaml.load(descriptors.ro_ns_text, Loader=yaml.Loader) + ro_ns_desc = yaml.safe_load(descriptors.ro_ns_text) # if ip address provided, replace descriptor ip_addresses = getenv("OSMLCMTEST_NS_IPADDRESS", "") @@ -176,36 +172,26 @@ class TestMyNS(asynctest.TestCase): Database.instance = None self.db = Database({"database": {"driver": "memory"}}).instance.db - self.db.create_list( - "vnfds", yaml.load(descriptors.db_vnfds_text, Loader=yaml.Loader) - ) + self.db.create_list("vnfds", yaml.safe_load(descriptors.db_vnfds_text)) self.db.create_list( "vnfds_revisions", - yaml.load(descriptors.db_vnfds_revisions_text, Loader=yaml.Loader), - ) - self.db.create_list( - "nsds", yaml.load(descriptors.db_nsds_text, Loader=yaml.Loader) - ) - self.db.create_list( - "nsrs", yaml.load(descriptors.db_nsrs_text, Loader=yaml.Loader) + yaml.safe_load(descriptors.db_vnfds_revisions_text), ) + self.db.create_list("nsds", yaml.safe_load(descriptors.db_nsds_text)) + self.db.create_list("nsrs", yaml.safe_load(descriptors.db_nsrs_text)) self.db.create_list( "vim_accounts", - yaml.load(descriptors.db_vim_accounts_text, Loader=yaml.Loader), + yaml.safe_load(descriptors.db_vim_accounts_text), ) self.db.create_list( "k8sclusters", - yaml.load(descriptors.db_k8sclusters_text, Loader=yaml.Loader), + yaml.safe_load(descriptors.db_k8sclusters_text), ) self.db.create_list( - "nslcmops", yaml.load(descriptors.db_nslcmops_text, Loader=yaml.Loader) - ) - self.db.create_list( - "vnfrs", yaml.load(descriptors.db_vnfrs_text, Loader=yaml.Loader) - ) - self.db_vim_accounts = yaml.load( - descriptors.db_vim_accounts_text, Loader=yaml.Loader + "nslcmops", yaml.safe_load(descriptors.db_nslcmops_text) ) + self.db.create_list("vnfrs", yaml.safe_load(descriptors.db_vnfrs_text)) + self.db_vim_accounts = yaml.safe_load(descriptors.db_vim_accounts_text) # Mock kafka self.msg = asynctest.Mock(MsgKafka()) @@ -839,7 +825,43 @@ class TestMyNS(asynctest.TestCase): self.assertEqual(return_value, expected_value) with self.assertRaises(Exception) as context: self.db.get_one("vnfrs", {"_id": vnf_instance_id}) - self.assertTrue("database exception Not found entry with filter" in str(context.exception)) + self.assertTrue( + "database exception Not found entry with filter" + in str(context.exception) + ) + + # test vertical scale executes sucessfully + # @patch("osm_lcm.ng_ro.status.response") + @asynctest.fail_on(active_handles=True) + async def test_vertical_scaling(self): + nsr_id = descriptors.test_ids["TEST-V-SCALE"]["ns"] + nslcmop_id = descriptors.test_ids["TEST-V-SCALE"]["instantiate"] + + # calling the vertical scale fucntion + # self.my_ns.RO.status = asynctest.CoroutineMock(self.my_ns.RO.status, side_effect=self._ro_status("update")) + mock_wait_ng_ro = asynctest.CoroutineMock() + with patch("osm_lcm.ns.NsLcm._wait_ng_ro", mock_wait_ng_ro): + await self.my_ns.vertical_scale(nsr_id, nslcmop_id) + return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get( + "operationState" + ) + expected_value = "COMPLETED" + self.assertEqual(return_value, expected_value) + + # test vertical scale executes fail + @asynctest.fail_on(active_handles=True) + async def test_vertical_scaling_fail(self): + # get th nsr nad nslcmops id from descriptors + nsr_id = descriptors.test_ids["TEST-V-SCALE"]["ns"] + nslcmop_id = descriptors.test_ids["TEST-V-SCALE"]["instantiate-1"] + + # calling the vertical scale fucntion + await self.my_ns.vertical_scale(nsr_id, nslcmop_id) + return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get( + "operationState" + ) + expected_value = "FAILED" + self.assertEqual(return_value, expected_value) # async def test_instantiate_pdu(self): # nsr_id = descriptors.test_ids["TEST-A"]["ns"] @@ -1524,7 +1546,7 @@ class TestMyNS(asynctest.TestCase): result = find_software_version(db_vnfd) self.assertEqual(result, expected_result, "VNFD software version is wrong") - with self.subTest(i=3, t="Check charm hash, Hash has did not change"): + with self.subTest(i=3, t="Check charm hash, Hash did not change"): # Testing method check_charm_hash_changed current_path, target_path = "/tmp/charm1", "/tmp/charm1"