nsr_id = descriptors.test_ids["TEST-UPDATE"]["ns"]
nslcmop_id = descriptors.test_ids["TEST-UPDATE"]["removeVnf"]
vnf_instance_id = descriptors.test_ids["TEST-UPDATE"]["vnf"]
- self.my_ns.RO.status = asynctest.CoroutineMock(self.my_ns.RO.status, side_effect=self._ro_status("update"))
- await self.my_ns.update(nsr_id, nslcmop_id)
- expected_value = "COMPLETED"
- return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get(
- "operationState"
- )
- self.assertEqual(return_value, expected_value)
- with self.assertRaises(Exception) as context:
- self.db.get_one("vnfrs", {"_id": vnf_instance_id})
- self.assertTrue("database exception Not found entry with filter" in str(context.exception))
+ mock_wait_ng_ro = asynctest.CoroutineMock()
+ with patch("osm_lcm.ns.NsLcm._wait_ng_ro", mock_wait_ng_ro):
+ await self.my_ns.update(nsr_id, nslcmop_id)
+ expected_value = "COMPLETED"
+ return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get(
+ "operationState"
+ )
+ self.assertEqual(return_value, expected_value)
+ with self.assertRaises(Exception) as context:
+ self.db.get_one("vnfrs", {"_id": vnf_instance_id})
+ self.assertTrue("database exception Not found entry with filter" in str(context.exception))
# async def test_instantiate_pdu(self):
# nsr_id = descriptors.test_ids["TEST-A"]["ns"]