UserTopicAuth,
CommonVimWimSdn,
VcaTopic,
+ PaasTopic,
)
from osm_nbi.engine import EngineException
from osm_nbi.authconn import AuthconnNotFoundException
mock_check_conflict_on_del.assert_not_called()
+class TestPaaSTopic(TestCase):
+ def setUp(self):
+ self.db = Mock(dbbase.DbBase())
+ self.fs = Mock(fsbase.FsBase())
+ self.msg = Mock(msgbase.MsgBase())
+ self.auth = Mock(authconn.Authconn(None, None, None))
+ self.paas_topic = PaasTopic(self.db, self.fs, self.msg, self.auth)
+
+ def test_format_on_new(self):
+ content = {
+ "_id": "id",
+ "secret": "secret_to_encrypt",
+ }
+ self.db.encrypt.side_effect = ["encrypted_secret"]
+
+ expecte_oid = "id:0"
+ expected_num_operations = 1
+ oid = self.paas_topic.format_on_new(content)
+
+ self.assertEqual(oid, expecte_oid)
+ self.assertEqual(content["secret"], "encrypted_secret")
+ self.assertEqual(content["_admin"]["operationalState"], "PROCESSING")
+ self.assertEqual(content["_admin"]["current_operation"], None)
+ self.assertEqual(len(content["_admin"]["operations"]), expected_num_operations)
+ self.assertEqual(
+ content["_admin"]["operations"][0]["lcmOperationType"], "create"
+ )
+ self.db.encrypt.assert_called_with(
+ "secret_to_encrypt", schema_version="1.11", salt="id"
+ )
+
+ @patch("osm_nbi.base_topic.BaseTopic._get_project_filter")
+ def test_check_conflict_on_new(self, mock_get_project_filter):
+ indata = {"name": "new_paas_name"}
+ session = {}
+ mock_get_project_filter.return_value = {}
+ self.db.get_one.return_value = None
+ self.paas_topic.check_conflict_on_new(session, indata)
+
+ @patch("osm_nbi.base_topic.BaseTopic._get_project_filter")
+ def test_check_conflict_on_new_raise_exception(self, mock_get_project_filter):
+ indata = {"name": "new_paas_name"}
+ session = {}
+ mock_get_project_filter.return_value = {}
+ self.db.get_one.return_value = ["Found_PaaS"]
+ with self.assertRaises(EngineException):
+ self.paas_topic.check_conflict_on_new(session, indata)
+
+ @patch("osm_nbi.base_topic.BaseTopic._get_project_filter")
+ def test_check_conflict_on_edit(self, mock_get_project_filter):
+ edit_content = {"name": "new_paas_name"}
+ final_content = {}
+ session = {"force": None}
+ mock_get_project_filter.return_value = {}
+ self.db.get_one.return_value = None
+ self.paas_topic.check_conflict_on_edit(
+ session, final_content, edit_content, "id"
+ )
+
+ @patch("osm_nbi.base_topic.BaseTopic._get_project_filter")
+ def test_check_conflict_on_edit_raise_exception(self, mock_get_project_filter):
+ edit_content = {"name": "new_paas_name"}
+ final_content = {}
+ session = {"force": None}
+ mock_get_project_filter.return_value = {}
+ self.db.get_one.return_value = ["Found_PaaS"]
+ with self.assertRaises(EngineException):
+ self.paas_topic.check_conflict_on_edit(
+ session, final_content, edit_content, "id"
+ )
+
+ def test_format_on_edit(self):
+ edit_content = {
+ "_id": "id",
+ "secret": "secret_to_encrypt",
+ }
+ final_content = {
+ "_id": "id",
+ "_admin": {"operations": [{"lcmOperationType": "create"}]},
+ "schema_version": "1.11",
+ }
+ self.db.encrypt.side_effect = ["encrypted_secret"]
+ expected_oid = "id:1"
+ expected_num_operations = 2
+ print(self.paas_topic.password_to_encrypt)
+ oid = self.paas_topic.format_on_edit(final_content, edit_content)
+
+ self.assertEqual(oid, expected_oid)
+ self.assertEqual(final_content["secret"], "encrypted_secret")
+ self.assertEqual(
+ len(final_content["_admin"]["operations"]), expected_num_operations
+ )
+ self.assertEqual(final_content["_admin"]["operationalState"], "PROCESSING")
+ self.assertEqual(final_content["_admin"]["detailed-status"], "Editing")
+ self.db.encrypt.assert_called_with(
+ "secret_to_encrypt", schema_version="1.11", salt="id"
+ )
+
+
class Test_ProjectTopicAuth(TestCase):
@classmethod
def setUpClass(cls):
norm(str(e.exception)),
"Wrong exception text",
)
+ with self.subTest(i=3):
+ self.auth.get_user_list.side_effect = [[user], []]
+ self.auth.get_user.return_value = user
+ old_password = self.test_name
+ new_pasw = "new-password"
+ self.topic.edit(
+ self.fake_session,
+ uid,
+ {
+ "old_password": old_password,
+ "password": new_pasw,
+ },
+ )
+ content = self.auth.update_user.call_args[0][0]
+ self.assertEqual(
+ content["old_password"], old_password, "Wrong old password"
+ )
+ self.assertEqual(content["password"], new_pasw, "Wrong user password")
def test_delete_user(self):
with self.subTest(i=1):