for expect_text in expect_text_list:
self.assertIn(expect_text, str(e.exception).lower(),
"Expected '{}' at exception text".format(expect_text))
+
+ def test_delete_ns(self):
+ self.db.create_list("nsrs", yaml.load(db_nsrs_text, Loader=yaml.Loader))
+ self.nsr = self.db.get_list("nsrs")[0]
+ self.nsr_id = self.nsr["_id"]
+ self.db_set_one = self.db.set_one
+ p_id = self.nsd_project
+ p_other = "other_p"
+
+ session = {"force": False, "admin": False, "public": None, "project_id": [p_id], "method": "delete"}
+ session2 = {"force": False, "admin": False, "public": None, "project_id": [p_other], "method": "delete"}
+ session_force = {"force": True, "admin": True, "public": None, "project_id": [], "method": "delete"}
+ with self.subTest(i=1, t='Normal Deletion'):
+ self.db.del_one = Mock()
+ self.db.set_one = Mock()
+ self.nsr_topic.delete(session, self.nsr_id)
+
+ db_args = self.db.del_one.call_args[0]
+ msg_args = self.msg.write.call_args[0]
+ self.assertEqual(msg_args[0], self.nsr_topic.topic_msg, "Wrong message topic")
+ self.assertEqual(msg_args[1], "deleted", "Wrong message action")
+ self.assertEqual(msg_args[2], {"_id": self.nsr_id}, "Wrong message content")
+ self.assertEqual(db_args[0], self.nsr_topic.topic, "Wrong DB topic")
+ self.assertEqual(db_args[1]["_id"], self.nsr_id, "Wrong DB ID")
+ self.assertEqual(db_args[1]["_admin.projects_read.cont"], [p_id], "Wrong DB filter")
+ self.db.set_one.assert_not_called()
+ fs_del_calls = self.fs.file_delete.call_args_list
+ self.assertEqual(fs_del_calls[0][0][0], self.nsr_id, "Wrong FS file id")
+ with self.subTest(i=2, t='No delete because referenced by other project'):
+ self.db_set_one("nsrs", {"_id": self.nsr_id}, update_dict=None, push={"_admin.projects_read": p_other,
+ "_admin.projects_write": p_other})
+ self.db.del_one.reset_mock()
+ self.db.set_one.reset_mock()
+ self.msg.write.reset_mock()
+ self.fs.file_delete.reset_mock()
+
+ self.nsr_topic.delete(session2, self.nsr_id)
+ self.db.del_one.assert_not_called()
+ self.msg.write.assert_not_called()
+ db_s1_args = self.db.set_one.call_args
+ self.assertEqual(db_s1_args[0][0], self.nsr_topic.topic, "Wrong DB topic")
+ self.assertEqual(db_s1_args[0][1]["_id"], self.nsr_id, "Wrong DB ID")
+ self.assertIsNone(db_s1_args[1]["update_dict"], "Wrong DB update dictionary")
+ self.assertEqual(db_s1_args[1]["pull_list"],
+ {"_admin.projects_read": [p_other], "_admin.projects_write": [p_other]},
+ "Wrong DB pull_list dictionary")
+ self.fs.file_delete.assert_not_called()
+ with self.subTest(i=4, t='Delete with force and admin'):
+ self.db.del_one.reset_mock()
+ self.db.set_one.reset_mock()
+ self.msg.write.reset_mock()
+ self.fs.file_delete.reset_mock()
+ self.nsr_topic.delete(session_force, self.nsr_id)
+
+ db_args = self.db.del_one.call_args[0]
+ msg_args = self.msg.write.call_args[0]
+ self.assertEqual(msg_args[0], self.nsr_topic.topic_msg, "Wrong message topic")
+ self.assertEqual(msg_args[1], "deleted", "Wrong message action")
+ self.assertEqual(msg_args[2], {"_id": self.nsr_id}, "Wrong message content")
+ self.assertEqual(db_args[0], self.nsr_topic.topic, "Wrong DB topic")
+ self.assertEqual(db_args[1]["_id"], self.nsr_id, "Wrong DB ID")
+ self.db.set_one.assert_not_called()
+ fs_del_calls = self.fs.file_delete.call_args_list
+ self.assertEqual(fs_del_calls[0][0][0], self.nsr_id, "Wrong FS file id")
+ with self.subTest(i=3, t='Conflict on Delete - NS in INSTANTIATED state'):
+ self.db_set_one("nsrs", {"_id": self.nsr_id}, {"_admin.nsState": "INSTANTIATED"},
+ pull={"_admin.projects_read": p_other, "_admin.projects_write": p_other})
+ self.db.del_one.reset_mock()
+ self.db.set_one.reset_mock()
+ self.msg.write.reset_mock()
+ self.fs.file_delete.reset_mock()
+
+ with self.assertRaises(EngineException, msg="Accepted NSR with nsState INSTANTIATED") as e:
+ self.nsr_topic.delete(session, self.nsr_id)
+ self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
+ self.assertIn("INSTANTIATED", str(e.exception), "Wrong exception text")
+ # TODOD with self.subTest(i=3, t='Conflict on Delete - NS in use by NSI'):
+
+ with self.subTest(i=4, t='Non-existent NS'):
+ self.db.del_one.reset_mock()
+ self.db.set_one.reset_mock()
+ self.msg.write.reset_mock()
+ self.fs.file_delete.reset_mock()
+ excp_msg = "Not found"
+ with self.assertRaises(DbException, msg="Accepted non-existent NSD ID") as e:
+ self.nsr_topic.delete(session2, "other_id")
+ self.assertEqual(e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code")
+ self.assertIn(excp_msg, str(e.exception), "Wrong exception text")
+ self.assertIn("other_id", str(e.exception), "Wrong exception text")
+ return