def _ro_status(self, *args, **kwargs):
print("Args > {}".format(args))
print("kwargs > {}".format(kwargs))
+ if args:
+ if "update" in args:
+ ro_ns_desc = yaml.load(
+ descriptors.ro_update_action_text, Loader=yaml.Loader
+ )
+ while True:
+ yield ro_ns_desc
if kwargs.get("delete"):
ro_ns_desc = yaml.load(
descriptors.ro_delete_action_text, Loader=yaml.Loader
# await self.test_instantiate()
# # this will check that the initial-congig-primitive 'not_to_be_called' is not called
+ @asynctest.fail_on(active_handles=True)
+ async def test_start_stop_rebuild_pass(self):
+ nsr_id = descriptors.test_ids["TEST-OP-VNF"]["ns"]
+ nslcmop_id = descriptors.test_ids["TEST-OP-VNF"]["nslcmops"]
+ vnf_id = descriptors.test_ids["TEST-OP-VNF"]["vnfrs"]
+ additional_param = {"count-index": "0"}
+ operation_type = "start"
+ await self.my_ns.rebuild_start_stop(
+ nsr_id, nslcmop_id, vnf_id, additional_param, operation_type
+ )
+ expected_value = "COMPLETED"
+ return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get(
+ "operationState"
+ )
+ self.assertEqual(return_value, expected_value)
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_start_stop_rebuild_fail(self):
+ nsr_id = descriptors.test_ids["TEST-OP-VNF"]["ns"]
+ nslcmop_id = descriptors.test_ids["TEST-OP-VNF"]["nslcmops1"]
+ vnf_id = descriptors.test_ids["TEST-OP-VNF"]["vnfrs"]
+ additional_param = {"count-index": "0"}
+ operation_type = "stop"
+ await self.my_ns.rebuild_start_stop(
+ nsr_id, nslcmop_id, vnf_id, additional_param, operation_type
+ )
+ expected_value = "Error"
+ return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get(
+ "operationState"
+ )
+ self.assertEqual(return_value, expected_value)
+
# Test scale() and related methods
@asynctest.fail_on(active_handles=True) # all async tasks must be completed
async def test_scale(self):
or expected_kdu_model in nsr_kdu_model_result
)
+ # Test remove_vnf() and related methods
+ @asynctest.fail_on(active_handles=True) # all async tasks must be completed
+ async def test_remove_vnf(self):
+ # Test REMOVE_VNF
+ nsr_id = descriptors.test_ids["TEST-UPDATE"]["ns"]
+ nslcmop_id = descriptors.test_ids["TEST-UPDATE"]["removeVnf"]
+ vnf_instance_id = descriptors.test_ids["TEST-UPDATE"]["vnf"]
+ mock_wait_ng_ro = asynctest.CoroutineMock()
+ with patch("osm_lcm.ns.NsLcm._wait_ng_ro", mock_wait_ng_ro):
+ await self.my_ns.update(nsr_id, nslcmop_id)
+ expected_value = "COMPLETED"
+ return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get(
+ "operationState"
+ )
+ self.assertEqual(return_value, expected_value)
+ with self.assertRaises(Exception) as context:
+ self.db.get_one("vnfrs", {"_id": vnf_instance_id})
+ self.assertTrue("database exception Not found entry with filter" in str(context.exception))
+
+ # test vertical scale executes sucessfully
+ # @patch("osm_lcm.ng_ro.status.response")
+ @asynctest.fail_on(active_handles=True)
+ async def test_vertical_scaling(self):
+ nsr_id = descriptors.test_ids["TEST-V-SCALE"]["ns"]
+ nslcmop_id = descriptors.test_ids["TEST-V-SCALE"]["instantiate"]
+
+ # calling the vertical scale fucntion
+ # self.my_ns.RO.status = asynctest.CoroutineMock(self.my_ns.RO.status, side_effect=self._ro_status("update"))
+ mock_wait_ng_ro = asynctest.CoroutineMock()
+ with patch("osm_lcm.ns.NsLcm._wait_ng_ro", mock_wait_ng_ro):
+ await self.my_ns.vertical_scale(nsr_id, nslcmop_id)
+ return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get(
+ "operationState"
+ )
+ expected_value = "COMPLETED"
+ self.assertEqual(return_value, expected_value)
+
+ # test vertical scale executes fail
+ @asynctest.fail_on(active_handles=True)
+ async def test_vertical_scaling_fail(self):
+ # get th nsr nad nslcmops id from descriptors
+ nsr_id = descriptors.test_ids["TEST-V-SCALE"]["ns"]
+ nslcmop_id = descriptors.test_ids["TEST-V-SCALE"]["instantiate-1"]
+
+ # calling the vertical scale fucntion
+ await self.my_ns.vertical_scale(nsr_id, nslcmop_id)
+ return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get(
+ "operationState"
+ )
+ expected_value = "FAILED"
+ self.assertEqual(return_value, expected_value)
+
# async def test_instantiate_pdu(self):
# nsr_id = descriptors.test_ids["TEST-A"]["ns"]
# nslcmop_id = descriptors.test_ids["TEST-A"]["instantiate"]
mock_charm_hash.assert_called_with(
"7637bcf8-cf14-42dc-ad70-c66fcf1e6e77:1/hackfest_3charmed_vnfd/charms/simple",
- "7637bcf8-cf14-42dc-ad70-c66fcf1e6e77/hackfest_3charmed_vnfd/charms/simple",
+ "7637bcf8-cf14-42dc-ad70-c66fcf1e6e77:3/hackfest_3charmed_vnfd/charms/simple",
)
self.assertEqual(fs.sync.call_count, 2)
mock_charm_hash.assert_called_with(
"7637bcf8-cf14-42dc-ad70-c66fcf1e6e77:1/hackfest_3charmed_vnfd/charms/simple",
- "7637bcf8-cf14-42dc-ad70-c66fcf1e6e77/hackfest_3charmed_vnfd/charms/simple",
+ "7637bcf8-cf14-42dc-ad70-c66fcf1e6e77:3/hackfest_3charmed_vnfd/charms/simple",
)
self.assertEqual(fs.sync.call_count, 2)