print("kwargs > {}".format(kwargs))
if args:
if "update" in args:
- ro_ns_desc = yaml.load(
- descriptors.ro_update_action_text, Loader=yaml.Loader
- )
+ ro_ns_desc = yaml.safe_load(descriptors.ro_update_action_text)
while True:
yield ro_ns_desc
if kwargs.get("delete"):
- ro_ns_desc = yaml.load(
- descriptors.ro_delete_action_text, Loader=yaml.Loader
- )
+ ro_ns_desc = yaml.safe_load(descriptors.ro_delete_action_text)
while True:
yield ro_ns_desc
- ro_ns_desc = yaml.load(descriptors.ro_ns_text, Loader=yaml.Loader)
+ ro_ns_desc = yaml.safe_load(descriptors.ro_ns_text)
# if ip address provided, replace descriptor
ip_addresses = getenv("OSMLCMTEST_NS_IPADDRESS", "")
Database.instance = None
self.db = Database({"database": {"driver": "memory"}}).instance.db
- self.db.create_list(
- "vnfds", yaml.load(descriptors.db_vnfds_text, Loader=yaml.Loader)
- )
+ self.db.create_list("vnfds", yaml.safe_load(descriptors.db_vnfds_text))
self.db.create_list(
"vnfds_revisions",
- yaml.load(descriptors.db_vnfds_revisions_text, Loader=yaml.Loader),
- )
- self.db.create_list(
- "nsds", yaml.load(descriptors.db_nsds_text, Loader=yaml.Loader)
- )
- self.db.create_list(
- "nsrs", yaml.load(descriptors.db_nsrs_text, Loader=yaml.Loader)
+ yaml.safe_load(descriptors.db_vnfds_revisions_text),
)
+ self.db.create_list("nsds", yaml.safe_load(descriptors.db_nsds_text))
+ self.db.create_list("nsrs", yaml.safe_load(descriptors.db_nsrs_text))
self.db.create_list(
"vim_accounts",
- yaml.load(descriptors.db_vim_accounts_text, Loader=yaml.Loader),
+ yaml.safe_load(descriptors.db_vim_accounts_text),
)
self.db.create_list(
"k8sclusters",
- yaml.load(descriptors.db_k8sclusters_text, Loader=yaml.Loader),
+ yaml.safe_load(descriptors.db_k8sclusters_text),
)
self.db.create_list(
- "nslcmops", yaml.load(descriptors.db_nslcmops_text, Loader=yaml.Loader)
- )
- self.db.create_list(
- "vnfrs", yaml.load(descriptors.db_vnfrs_text, Loader=yaml.Loader)
- )
- self.db_vim_accounts = yaml.load(
- descriptors.db_vim_accounts_text, Loader=yaml.Loader
+ "nslcmops", yaml.safe_load(descriptors.db_nslcmops_text)
)
+ self.db.create_list("vnfrs", yaml.safe_load(descriptors.db_vnfrs_text))
+ self.db_vim_accounts = yaml.safe_load(descriptors.db_vim_accounts_text)
# Mock kafka
self.msg = asynctest.Mock(MsgKafka())
# await self.test_instantiate()
# # this will check that the initial-congig-primitive 'not_to_be_called' is not called
+ @asynctest.fail_on(active_handles=True)
+ async def test_start_stop_rebuild_pass(self):
+ nsr_id = descriptors.test_ids["TEST-OP-VNF"]["ns"]
+ nslcmop_id = descriptors.test_ids["TEST-OP-VNF"]["nslcmops"]
+ vnf_id = descriptors.test_ids["TEST-OP-VNF"]["vnfrs"]
+ additional_param = {"count-index": "0"}
+ operation_type = "start"
+ await self.my_ns.rebuild_start_stop(
+ nsr_id, nslcmop_id, vnf_id, additional_param, operation_type
+ )
+ expected_value = "COMPLETED"
+ return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get(
+ "operationState"
+ )
+ self.assertEqual(return_value, expected_value)
+
+ @asynctest.fail_on(active_handles=True)
+ async def test_start_stop_rebuild_fail(self):
+ nsr_id = descriptors.test_ids["TEST-OP-VNF"]["ns"]
+ nslcmop_id = descriptors.test_ids["TEST-OP-VNF"]["nslcmops1"]
+ vnf_id = descriptors.test_ids["TEST-OP-VNF"]["vnfrs"]
+ additional_param = {"count-index": "0"}
+ operation_type = "stop"
+ await self.my_ns.rebuild_start_stop(
+ nsr_id, nslcmop_id, vnf_id, additional_param, operation_type
+ )
+ expected_value = "Error"
+ return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get(
+ "operationState"
+ )
+ self.assertEqual(return_value, expected_value)
+
# Test scale() and related methods
@asynctest.fail_on(active_handles=True) # all async tasks must be completed
async def test_scale(self):
nsr_id = descriptors.test_ids["TEST-UPDATE"]["ns"]
nslcmop_id = descriptors.test_ids["TEST-UPDATE"]["removeVnf"]
vnf_instance_id = descriptors.test_ids["TEST-UPDATE"]["vnf"]
- self.my_ns.RO.status = asynctest.CoroutineMock(self.my_ns.RO.status, side_effect=self._ro_status("update"))
- await self.my_ns.update(nsr_id, nslcmop_id)
- expected_value = "COMPLETED"
+ mock_wait_ng_ro = asynctest.CoroutineMock()
+ with patch("osm_lcm.ns.NsLcm._wait_ng_ro", mock_wait_ng_ro):
+ await self.my_ns.update(nsr_id, nslcmop_id)
+ expected_value = "COMPLETED"
+ return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get(
+ "operationState"
+ )
+ self.assertEqual(return_value, expected_value)
+ with self.assertRaises(Exception) as context:
+ self.db.get_one("vnfrs", {"_id": vnf_instance_id})
+ self.assertTrue(
+ "database exception Not found entry with filter"
+ in str(context.exception)
+ )
+
+ # test vertical scale executes sucessfully
+ # @patch("osm_lcm.ng_ro.status.response")
+ @asynctest.fail_on(active_handles=True)
+ async def test_vertical_scaling(self):
+ nsr_id = descriptors.test_ids["TEST-V-SCALE"]["ns"]
+ nslcmop_id = descriptors.test_ids["TEST-V-SCALE"]["instantiate"]
+
+ # calling the vertical scale fucntion
+ # self.my_ns.RO.status = asynctest.CoroutineMock(self.my_ns.RO.status, side_effect=self._ro_status("update"))
+ mock_wait_ng_ro = asynctest.CoroutineMock()
+ with patch("osm_lcm.ns.NsLcm._wait_ng_ro", mock_wait_ng_ro):
+ await self.my_ns.vertical_scale(nsr_id, nslcmop_id)
+ return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get(
+ "operationState"
+ )
+ expected_value = "COMPLETED"
+ self.assertEqual(return_value, expected_value)
+
+ # test vertical scale executes fail
+ @asynctest.fail_on(active_handles=True)
+ async def test_vertical_scaling_fail(self):
+ # get th nsr nad nslcmops id from descriptors
+ nsr_id = descriptors.test_ids["TEST-V-SCALE"]["ns"]
+ nslcmop_id = descriptors.test_ids["TEST-V-SCALE"]["instantiate-1"]
+
+ # calling the vertical scale fucntion
+ await self.my_ns.vertical_scale(nsr_id, nslcmop_id)
return_value = self.db.get_one("nslcmops", {"_id": nslcmop_id}).get(
"operationState"
)
+ expected_value = "FAILED"
self.assertEqual(return_value, expected_value)
- with self.assertRaises(Exception) as context:
- self.db.get_one("vnfrs", {"_id": vnf_instance_id})
- self.assertTrue("database exception Not found entry with filter" in str(context.exception))
# async def test_instantiate_pdu(self):
# nsr_id = descriptors.test_ids["TEST-A"]["ns"]
result = find_software_version(db_vnfd)
self.assertEqual(result, expected_result, "VNFD software version is wrong")
- with self.subTest(i=3, t="Check charm hash, Hash has did not change"):
+ with self.subTest(i=3, t="Check charm hash, Hash did not change"):
# Testing method check_charm_hash_changed
current_path, target_path = "/tmp/charm1", "/tmp/charm1"