# await self.test_instantiate()
# # this will check that the initial-congig-primitive 'not_to_be_called' is not called
+ @asynctest.fail_on(active_handles=True)
+ async def test_start_stop_rebuild_pass(self):
+ nsr_id = descriptors.test_ids["TEST-OP-VNF"]["ns"]
+ nslcmop_id = descriptors.test_ids["TEST-OP-VNF"]["nslcmops"]
+ vnf_id = descriptors.test_ids["TEST-OP-VNF"]["vnfrs"]
+ additional_param = {"count-index": "0"}
+ operation_type = "start"
+ await self.my_ns.rebuild_start_stop(
+ nsr_id, nslcmop_id, vnf_id, additional_param, operation_type
+ )
+ expected_value = "COMPLETED"
+ return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get(
+ "operationState"
+ )
+ self.assertEqual(return_value, expected_value)
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_start_stop_rebuild_fail(self):
+ nsr_id = descriptors.test_ids["TEST-OP-VNF"]["ns"]
+ nslcmop_id = descriptors.test_ids["TEST-OP-VNF"]["nslcmops1"]
+ vnf_id = descriptors.test_ids["TEST-OP-VNF"]["vnfrs"]
+ additional_param = {"count-index": "0"}
+ operation_type = "stop"
+ await self.my_ns.rebuild_start_stop(
+ nsr_id, nslcmop_id, vnf_id, additional_param, operation_type
+ )
+ expected_value = "Error"
+ return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get(
+ "operationState"
+ )
+ self.assertEqual(return_value, expected_value)
+
# Test scale() and related methods
@asynctest.fail_on(active_handles=True) # all async tasks must be completed
async def test_scale(self):
nsr_id = descriptors.test_ids["TEST-UPDATE"]["ns"]
nslcmop_id = descriptors.test_ids["TEST-UPDATE"]["removeVnf"]
vnf_instance_id = descriptors.test_ids["TEST-UPDATE"]["vnf"]
- self.my_ns.RO.status = asynctest.CoroutineMock(self.my_ns.RO.status, side_effect=self._ro_status("update"))
- await self.my_ns.update(nsr_id, nslcmop_id)
- expected_value = "COMPLETED"
- return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get(
- "operationState"
- )
- self.assertEqual(return_value, expected_value)
- with self.assertRaises(Exception) as context:
- self.db.get_one("vnfrs", {"_id": vnf_instance_id})
- self.assertTrue("database exception Not found entry with filter" in str(context.exception))
+ mock_wait_ng_ro = asynctest.CoroutineMock()
+ with patch("osm_lcm.ns.NsLcm._wait_ng_ro", mock_wait_ng_ro):
+ await self.my_ns.update(nsr_id, nslcmop_id)
+ expected_value = "COMPLETED"
+ return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get(
+ "operationState"
+ )
+ self.assertEqual(return_value, expected_value)
+ with self.assertRaises(Exception) as context:
+ self.db.get_one("vnfrs", {"_id": vnf_instance_id})
+ self.assertTrue("database exception Not found entry with filter" in str(context.exception))
# async def test_instantiate_pdu(self):
# nsr_id = descriptors.test_ids["TEST-A"]["ns"]