X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_lcm%2Ftests%2Ftest_ns.py;h=d7192c9196846a1908db0f963a4e739780b94fe6;hb=refs%2Fchanges%2F84%2F11984%2F12;hp=7613a88eeb369992b1e72d04c59e39a70af1abfb;hpb=ca7ece02d870a16bdcd47dab7c5ac30c82c58545;p=osm%2FLCM.git diff --git a/osm_lcm/tests/test_ns.py b/osm_lcm/tests/test_ns.py index 7613a88..d7192c9 100644 --- a/osm_lcm/tests/test_ns.py +++ b/osm_lcm/tests/test_ns.py @@ -390,6 +390,38 @@ class TestMyNS(asynctest.TestCase): # await self.test_instantiate() # # this will check that the initial-congig-primitive 'not_to_be_called' is not called + @asynctest.fail_on(active_handles=True) + async def test_start_stop_rebuild_pass(self): + nsr_id = descriptors.test_ids["TEST-OP-VNF"]["ns"] + nslcmop_id = descriptors.test_ids["TEST-OP-VNF"]["nslcmops"] + vnf_id = descriptors.test_ids["TEST-OP-VNF"]["vnfrs"] + additional_param = {"count-index": "0"} + operation_type = "start" + await self.my_ns.rebuild_start_stop( + nsr_id, nslcmop_id, vnf_id, additional_param, operation_type + ) + expected_value = "COMPLETED" + return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get( + "operationState" + ) + self.assertEqual(return_value, expected_value) + + @asynctest.fail_on(active_handles=True) + async def test_start_stop_rebuild_fail(self): + nsr_id = descriptors.test_ids["TEST-OP-VNF"]["ns"] + nslcmop_id = descriptors.test_ids["TEST-OP-VNF"]["nslcmops1"] + vnf_id = descriptors.test_ids["TEST-OP-VNF"]["vnfrs"] + additional_param = {"count-index": "0"} + operation_type = "stop" + await self.my_ns.rebuild_start_stop( + nsr_id, nslcmop_id, vnf_id, additional_param, operation_type + ) + expected_value = "Error" + return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get( + "operationState" + ) + self.assertEqual(return_value, expected_value) + # Test scale() and related methods @asynctest.fail_on(active_handles=True) # all async tasks must be completed async def test_scale(self): @@ -797,16 +829,50 @@ class TestMyNS(asynctest.TestCase): nsr_id = descriptors.test_ids["TEST-UPDATE"]["ns"] nslcmop_id = descriptors.test_ids["TEST-UPDATE"]["removeVnf"] vnf_instance_id = descriptors.test_ids["TEST-UPDATE"]["vnf"] - self.my_ns.RO.status = asynctest.CoroutineMock(self.my_ns.RO.status, side_effect=self._ro_status("update")) - await self.my_ns.update(nsr_id, nslcmop_id) - expected_value = "COMPLETED" + mock_wait_ng_ro = asynctest.CoroutineMock() + with patch("osm_lcm.ns.NsLcm._wait_ng_ro", mock_wait_ng_ro): + await self.my_ns.update(nsr_id, nslcmop_id) + expected_value = "COMPLETED" + return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get( + "operationState" + ) + self.assertEqual(return_value, expected_value) + with self.assertRaises(Exception) as context: + self.db.get_one("vnfrs", {"_id": vnf_instance_id}) + self.assertTrue("database exception Not found entry with filter" in str(context.exception)) + + # test vertical scale executes sucessfully + # @patch("osm_lcm.ng_ro.status.response") + @asynctest.fail_on(active_handles=True) + async def test_vertical_scaling(self): + nsr_id = descriptors.test_ids["TEST-V-SCALE"]["ns"] + nslcmop_id = descriptors.test_ids["TEST-V-SCALE"]["instantiate"] + + # calling the vertical scale fucntion + # self.my_ns.RO.status = asynctest.CoroutineMock(self.my_ns.RO.status, side_effect=self._ro_status("update")) + mock_wait_ng_ro = asynctest.CoroutineMock() + with patch("osm_lcm.ns.NsLcm._wait_ng_ro", mock_wait_ng_ro): + await self.my_ns.vertical_scale(nsr_id, nslcmop_id) + return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get( + "operationState" + ) + expected_value = "COMPLETED" + self.assertEqual(return_value, expected_value) + + # test vertical scale executes fail + @asynctest.fail_on(active_handles=True) + async def test_vertical_scaling_fail(self): + # get th nsr nad nslcmops id from descriptors + nsr_id = descriptors.test_ids["TEST-V-SCALE"]["ns"] + nslcmop_id = descriptors.test_ids["TEST-V-SCALE"]["instantiate-1"] + + # calling the vertical scale fucntion + await self.my_ns.vertical_scale(nsr_id, nslcmop_id) return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get( "operationState" ) + expected_value = "FAILED" self.assertEqual(return_value, expected_value) - with self.assertRaises(Exception) as context: - self.db.get_one("vnfrs", {"_id": vnf_instance_id}) - self.assertTrue("database exception Not found entry with filter" in str(context.exception)) # async def test_instantiate_pdu(self): # nsr_id = descriptors.test_ids["TEST-A"]["ns"] @@ -981,7 +1047,7 @@ class TestMyNS(asynctest.TestCase): mock_charm_hash.assert_called_with( "7637bcf8-cf14-42dc-ad70-c66fcf1e6e77:1/hackfest_3charmed_vnfd/charms/simple", - "7637bcf8-cf14-42dc-ad70-c66fcf1e6e77/hackfest_3charmed_vnfd/charms/simple", + "7637bcf8-cf14-42dc-ad70-c66fcf1e6e77:3/hackfest_3charmed_vnfd/charms/simple", ) self.assertEqual(fs.sync.call_count, 2) @@ -1151,7 +1217,7 @@ class TestMyNS(asynctest.TestCase): mock_charm_hash.assert_called_with( "7637bcf8-cf14-42dc-ad70-c66fcf1e6e77:1/hackfest_3charmed_vnfd/charms/simple", - "7637bcf8-cf14-42dc-ad70-c66fcf1e6e77/hackfest_3charmed_vnfd/charms/simple", + "7637bcf8-cf14-42dc-ad70-c66fcf1e6e77:3/hackfest_3charmed_vnfd/charms/simple", ) self.assertEqual(fs.sync.call_count, 2)