def _ro_status(self, *args, **kwargs):
print("Args > {}".format(args))
print("kwargs > {}".format(kwargs))
+ if args:
+ if "update" in args:
+ ro_ns_desc = yaml.load(
+ descriptors.ro_update_action_text, Loader=yaml.Loader
+ )
+ while True:
+ yield ro_ns_desc
if kwargs.get("delete"):
ro_ns_desc = yaml.load(
descriptors.ro_delete_action_text, Loader=yaml.Loader
or expected_kdu_model in nsr_kdu_model_result
)
+ # Test remove_vnf() and related methods
+ @asynctest.fail_on(active_handles=True) # all async tasks must be completed
+ async def test_remove_vnf(self):
+ # Test REMOVE_VNF
+ nsr_id = descriptors.test_ids["TEST-UPDATE"]["ns"]
+ nslcmop_id = descriptors.test_ids["TEST-UPDATE"]["removeVnf"]
+ vnf_instance_id = descriptors.test_ids["TEST-UPDATE"]["vnf"]
+ self.my_ns.RO.status = asynctest.CoroutineMock(self.my_ns.RO.status, side_effect=self._ro_status("update"))
+ await self.my_ns.update(nsr_id, nslcmop_id)
+ expected_value = "COMPLETED"
+ return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get(
+ "operationState"
+ )
+ self.assertEqual(return_value, expected_value)
+ with self.assertRaises(Exception) as context:
+ self.db.get_one("vnfrs", {"_id": vnf_instance_id})
+ self.assertTrue("database exception Not found entry with filter" in str(context.exception))
+
# async def test_instantiate_pdu(self):
# nsr_id = descriptors.test_ids["TEST-A"]["ns"]
# nslcmop_id = descriptors.test_ids["TEST-A"]["instantiate"]