X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_lcm%2Ftests%2Ftest_ns.py;h=d7192c9196846a1908db0f963a4e739780b94fe6;hb=refs%2Fchanges%2F84%2F11984%2F12;hp=86e2136dd0ebda582851090dd9933a5b138e8155;hpb=dffa6217777142746ed9b5c9a7eaab7c0d8716be;p=osm%2FLCM.git diff --git a/osm_lcm/tests/test_ns.py b/osm_lcm/tests/test_ns.py index 86e2136..d7192c9 100644 --- a/osm_lcm/tests/test_ns.py +++ b/osm_lcm/tests/test_ns.py @@ -120,6 +120,13 @@ class TestMyNS(asynctest.TestCase): def _ro_status(self, *args, **kwargs): print("Args > {}".format(args)) print("kwargs > {}".format(kwargs)) + if args: + if "update" in args: + ro_ns_desc = yaml.load( + descriptors.ro_update_action_text, Loader=yaml.Loader + ) + while True: + yield ro_ns_desc if kwargs.get("delete"): ro_ns_desc = yaml.load( descriptors.ro_delete_action_text, Loader=yaml.Loader @@ -383,6 +390,38 @@ class TestMyNS(asynctest.TestCase): # await self.test_instantiate() # # this will check that the initial-congig-primitive 'not_to_be_called' is not called + @asynctest.fail_on(active_handles=True) + async def test_start_stop_rebuild_pass(self): + nsr_id = descriptors.test_ids["TEST-OP-VNF"]["ns"] + nslcmop_id = descriptors.test_ids["TEST-OP-VNF"]["nslcmops"] + vnf_id = descriptors.test_ids["TEST-OP-VNF"]["vnfrs"] + additional_param = {"count-index": "0"} + operation_type = "start" + await self.my_ns.rebuild_start_stop( + nsr_id, nslcmop_id, vnf_id, additional_param, operation_type + ) + expected_value = "COMPLETED" + return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get( + "operationState" + ) + self.assertEqual(return_value, expected_value) + + @asynctest.fail_on(active_handles=True) + async def test_start_stop_rebuild_fail(self): + nsr_id = descriptors.test_ids["TEST-OP-VNF"]["ns"] + nslcmop_id = descriptors.test_ids["TEST-OP-VNF"]["nslcmops1"] + vnf_id = descriptors.test_ids["TEST-OP-VNF"]["vnfrs"] + additional_param = {"count-index": "0"} + operation_type = "stop" + await self.my_ns.rebuild_start_stop( + nsr_id, nslcmop_id, vnf_id, additional_param, operation_type + ) + expected_value = "Error" + return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get( + "operationState" + ) + self.assertEqual(return_value, expected_value) + # Test scale() and related methods @asynctest.fail_on(active_handles=True) # all async tasks must be completed async def test_scale(self): @@ -783,6 +822,58 @@ class TestMyNS(asynctest.TestCase): or expected_kdu_model in nsr_kdu_model_result ) + # Test remove_vnf() and related methods + @asynctest.fail_on(active_handles=True) # all async tasks must be completed + async def test_remove_vnf(self): + # Test REMOVE_VNF + nsr_id = descriptors.test_ids["TEST-UPDATE"]["ns"] + nslcmop_id = descriptors.test_ids["TEST-UPDATE"]["removeVnf"] + vnf_instance_id = descriptors.test_ids["TEST-UPDATE"]["vnf"] + mock_wait_ng_ro = asynctest.CoroutineMock() + with patch("osm_lcm.ns.NsLcm._wait_ng_ro", mock_wait_ng_ro): + await self.my_ns.update(nsr_id, nslcmop_id) + expected_value = "COMPLETED" + return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get( + "operationState" + ) + self.assertEqual(return_value, expected_value) + with self.assertRaises(Exception) as context: + self.db.get_one("vnfrs", {"_id": vnf_instance_id}) + self.assertTrue("database exception Not found entry with filter" in str(context.exception)) + + # test vertical scale executes sucessfully + # @patch("osm_lcm.ng_ro.status.response") + @asynctest.fail_on(active_handles=True) + async def test_vertical_scaling(self): + nsr_id = descriptors.test_ids["TEST-V-SCALE"]["ns"] + nslcmop_id = descriptors.test_ids["TEST-V-SCALE"]["instantiate"] + + # calling the vertical scale fucntion + # self.my_ns.RO.status = asynctest.CoroutineMock(self.my_ns.RO.status, side_effect=self._ro_status("update")) + mock_wait_ng_ro = asynctest.CoroutineMock() + with patch("osm_lcm.ns.NsLcm._wait_ng_ro", mock_wait_ng_ro): + await self.my_ns.vertical_scale(nsr_id, nslcmop_id) + return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get( + "operationState" + ) + expected_value = "COMPLETED" + self.assertEqual(return_value, expected_value) + + # test vertical scale executes fail + @asynctest.fail_on(active_handles=True) + async def test_vertical_scaling_fail(self): + # get th nsr nad nslcmops id from descriptors + nsr_id = descriptors.test_ids["TEST-V-SCALE"]["ns"] + nslcmop_id = descriptors.test_ids["TEST-V-SCALE"]["instantiate-1"] + + # calling the vertical scale fucntion + await self.my_ns.vertical_scale(nsr_id, nslcmop_id) + return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get( + "operationState" + ) + expected_value = "FAILED" + self.assertEqual(return_value, expected_value) + # async def test_instantiate_pdu(self): # nsr_id = descriptors.test_ids["TEST-A"]["ns"] # nslcmop_id = descriptors.test_ids["TEST-A"]["instantiate"] @@ -956,7 +1047,7 @@ class TestMyNS(asynctest.TestCase): mock_charm_hash.assert_called_with( "7637bcf8-cf14-42dc-ad70-c66fcf1e6e77:1/hackfest_3charmed_vnfd/charms/simple", - "7637bcf8-cf14-42dc-ad70-c66fcf1e6e77/hackfest_3charmed_vnfd/charms/simple", + "7637bcf8-cf14-42dc-ad70-c66fcf1e6e77:3/hackfest_3charmed_vnfd/charms/simple", ) self.assertEqual(fs.sync.call_count, 2) @@ -1126,7 +1217,7 @@ class TestMyNS(asynctest.TestCase): mock_charm_hash.assert_called_with( "7637bcf8-cf14-42dc-ad70-c66fcf1e6e77:1/hackfest_3charmed_vnfd/charms/simple", - "7637bcf8-cf14-42dc-ad70-c66fcf1e6e77/hackfest_3charmed_vnfd/charms/simple", + "7637bcf8-cf14-42dc-ad70-c66fcf1e6e77:3/hackfest_3charmed_vnfd/charms/simple", ) self.assertEqual(fs.sync.call_count, 2)