X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_lcm%2Ftests%2Ftest_ns.py;h=d7192c9196846a1908db0f963a4e739780b94fe6;hb=refs%2Fchanges%2F84%2F11984%2F12;hp=5e248a2bf3282e0fdd9f9dee14bb9bce29e90a61;hpb=1b782761acbcbbe4666d667a0f04a44f136ef1a3;p=osm%2FLCM.git diff --git a/osm_lcm/tests/test_ns.py b/osm_lcm/tests/test_ns.py index 5e248a2..d7192c9 100644 --- a/osm_lcm/tests/test_ns.py +++ b/osm_lcm/tests/test_ns.py @@ -390,6 +390,38 @@ class TestMyNS(asynctest.TestCase): # await self.test_instantiate() # # this will check that the initial-congig-primitive 'not_to_be_called' is not called + @asynctest.fail_on(active_handles=True) + async def test_start_stop_rebuild_pass(self): + nsr_id = descriptors.test_ids["TEST-OP-VNF"]["ns"] + nslcmop_id = descriptors.test_ids["TEST-OP-VNF"]["nslcmops"] + vnf_id = descriptors.test_ids["TEST-OP-VNF"]["vnfrs"] + additional_param = {"count-index": "0"} + operation_type = "start" + await self.my_ns.rebuild_start_stop( + nsr_id, nslcmop_id, vnf_id, additional_param, operation_type + ) + expected_value = "COMPLETED" + return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get( + "operationState" + ) + self.assertEqual(return_value, expected_value) + + @asynctest.fail_on(active_handles=True) + async def test_start_stop_rebuild_fail(self): + nsr_id = descriptors.test_ids["TEST-OP-VNF"]["ns"] + nslcmop_id = descriptors.test_ids["TEST-OP-VNF"]["nslcmops1"] + vnf_id = descriptors.test_ids["TEST-OP-VNF"]["vnfrs"] + additional_param = {"count-index": "0"} + operation_type = "stop" + await self.my_ns.rebuild_start_stop( + nsr_id, nslcmop_id, vnf_id, additional_param, operation_type + ) + expected_value = "Error" + return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get( + "operationState" + ) + self.assertEqual(return_value, expected_value) + # Test scale() and related methods @asynctest.fail_on(active_handles=True) # all async tasks must be completed async def test_scale(self): @@ -809,6 +841,39 @@ class TestMyNS(asynctest.TestCase): self.db.get_one("vnfrs", {"_id": vnf_instance_id}) self.assertTrue("database exception Not found entry with filter" in str(context.exception)) + # test vertical scale executes sucessfully + # @patch("osm_lcm.ng_ro.status.response") + @asynctest.fail_on(active_handles=True) + async def test_vertical_scaling(self): + nsr_id = descriptors.test_ids["TEST-V-SCALE"]["ns"] + nslcmop_id = descriptors.test_ids["TEST-V-SCALE"]["instantiate"] + + # calling the vertical scale fucntion + # self.my_ns.RO.status = asynctest.CoroutineMock(self.my_ns.RO.status, side_effect=self._ro_status("update")) + mock_wait_ng_ro = asynctest.CoroutineMock() + with patch("osm_lcm.ns.NsLcm._wait_ng_ro", mock_wait_ng_ro): + await self.my_ns.vertical_scale(nsr_id, nslcmop_id) + return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get( + "operationState" + ) + expected_value = "COMPLETED" + self.assertEqual(return_value, expected_value) + + # test vertical scale executes fail + @asynctest.fail_on(active_handles=True) + async def test_vertical_scaling_fail(self): + # get th nsr nad nslcmops id from descriptors + nsr_id = descriptors.test_ids["TEST-V-SCALE"]["ns"] + nslcmop_id = descriptors.test_ids["TEST-V-SCALE"]["instantiate-1"] + + # calling the vertical scale fucntion + await self.my_ns.vertical_scale(nsr_id, nslcmop_id) + return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get( + "operationState" + ) + expected_value = "FAILED" + self.assertEqual(return_value, expected_value) + # async def test_instantiate_pdu(self): # nsr_id = descriptors.test_ids["TEST-A"]["ns"] # nslcmop_id = descriptors.test_ids["TEST-A"]["instantiate"]