X-Git-Url: https://osm.etsi.org/gitweb/?p=osm%2FNBI.git;a=blobdiff_plain;f=osm_nbi%2Ftests%2Ftest_admin_topics.py;h=734a2896ad81c835f08ae8918dafa48bb280eea6;hp=74528f8f62354567cd2f6d0d72cc9177c71afa17;hb=4568a372eb5a204e04d917213de03ec51f9110c1;hpb=786aeddab7b7491a139c958ecec6439bee5da83a diff --git a/osm_nbi/tests/test_admin_topics.py b/osm_nbi/tests/test_admin_topics.py index 74528f8..734a289 100755 --- a/osm_nbi/tests/test_admin_topics.py +++ b/osm_nbi/tests/test_admin_topics.py @@ -43,7 +43,7 @@ test_name = "test-user" def norm(str): """Normalize string for checking""" - return ' '.join(str.strip().split()).lower() + return " ".join(str.strip().split()).lower() class TestVcaTopic(TestCase): @@ -119,7 +119,7 @@ class TestVcaTopic(TestCase): self.db.get_list.assert_called_with( "vim_accounts", - {"vca": _id, '_admin.projects_read.cont': 'project-id'}, + {"vca": _id, "_admin.projects_read.cont": "project-id"}, ) mock_check_conflict_on_del.assert_called_with(session, _id, db_content) @@ -154,19 +154,18 @@ class TestVcaTopic(TestCase): context.exception, EngineException( "There is at least one VIM account using this vca", - http_code=HTTPStatus.CONFLICT - ) + http_code=HTTPStatus.CONFLICT, + ), ) self.db.get_list.assert_called_with( "vim_accounts", - {"vca": _id, '_admin.projects_read.cont': 'project-id'}, + {"vca": _id, "_admin.projects_read.cont": "project-id"}, ) mock_check_conflict_on_del.assert_not_called() class Test_ProjectTopicAuth(TestCase): - @classmethod def setUpClass(cls): cls.test_name = "test-project-topic" @@ -177,8 +176,15 @@ class Test_ProjectTopicAuth(TestCase): self.msg = Mock(msgbase.MsgBase()) self.auth = Mock(authconn.Authconn(None, None, None)) self.topic = ProjectTopicAuth(self.db, self.fs, self.msg, self.auth) - self.fake_session = {"username": self.test_name, "project_id": (test_pid,), "method": None, - "admin": True, "force": False, "public": False, "allow_show_user_project_role": True} + self.fake_session = { + "username": self.test_name, + "project_id": (test_pid,), + "method": None, + "admin": True, + "force": False, + "public": False, + "allow_show_user_project_role": True, + } self.topic.check_quota = Mock(return_value=None) # skip quota def test_new_project(self): @@ -187,37 +193,64 @@ class Test_ProjectTopicAuth(TestCase): pid1 = str(uuid4()) self.auth.get_project_list.return_value = [] self.auth.create_project.return_value = pid1 - pid2, oid = self.topic.new(rollback, self.fake_session, {"name": self.test_name, "quotas": {}}) + pid2, oid = self.topic.new( + rollback, self.fake_session, {"name": self.test_name, "quotas": {}} + ) self.assertEqual(len(rollback), 1, "Wrong rollback length") self.assertEqual(pid2, pid1, "Wrong project identifier") content = self.auth.create_project.call_args[0][0] self.assertEqual(content["name"], self.test_name, "Wrong project name") self.assertEqual(content["quotas"], {}, "Wrong quotas") self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time") - self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time") + self.assertEqual( + content["_admin"]["modified"], + content["_admin"]["created"], + "Wrong modification time", + ) with self.subTest(i=2): rollback = [] with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e: - self.topic.new(rollback, self.fake_session, {"name": "other-project-name", "quotas": {"baditems": 10}}) + self.topic.new( + rollback, + self.fake_session, + {"name": "other-project-name", "quotas": {"baditems": 10}}, + ) self.assertEqual(len(rollback), 0, "Wrong rollback length") - self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") - self.assertIn("format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'" - .format("baditems"), norm(str(e.exception)), "Wrong exception text") + self.assertEqual( + e.exception.http_code, + HTTPStatus.UNPROCESSABLE_ENTITY, + "Wrong HTTP status code", + ) + self.assertIn( + "format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'".format( + "baditems" + ), + norm(str(e.exception)), + "Wrong exception text", + ) def test_edit_project(self): now = time() pid = str(uuid4()) - proj = {"_id": pid, "name": self.test_name, "_admin": {"created": now, "modified": now}} + proj = { + "_id": pid, + "name": self.test_name, + "_admin": {"created": now, "modified": now}, + } with self.subTest(i=1): self.auth.get_project_list.side_effect = [[proj], []] new_name = "new-project-name" quotas = {"vnfds": randint(0, 100), "nsds": randint(0, 100)} - self.topic.edit(self.fake_session, pid, {"name": new_name, "quotas": quotas}) + self.topic.edit( + self.fake_session, pid, {"name": new_name, "quotas": quotas} + ) _id, content = self.auth.update_project.call_args[0] self.assertEqual(_id, pid, "Wrong project identifier") self.assertEqual(content["_id"], pid, "Wrong project identifier") self.assertEqual(content["_admin"]["created"], now, "Wrong creation time") - self.assertGreater(content["_admin"]["modified"], now, "Wrong modification time") + self.assertGreater( + content["_admin"]["modified"], now, "Wrong modification time" + ) self.assertEqual(content["name"], new_name, "Wrong project name") self.assertEqual(content["quotas"], quotas, "Wrong quotas") with self.subTest(i=2): @@ -225,108 +258,218 @@ class Test_ProjectTopicAuth(TestCase): quotas = {"baditems": randint(0, 100)} self.auth.get_project_list.side_effect = [[proj], []] with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e: - self.topic.edit(self.fake_session, pid, {"name": new_name, "quotas": quotas}) - self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") - self.assertIn("format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'" - .format("baditems"), norm(str(e.exception)), "Wrong exception text") + self.topic.edit( + self.fake_session, pid, {"name": new_name, "quotas": quotas} + ) + self.assertEqual( + e.exception.http_code, + HTTPStatus.UNPROCESSABLE_ENTITY, + "Wrong HTTP status code", + ) + self.assertIn( + "format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'".format( + "baditems" + ), + norm(str(e.exception)), + "Wrong exception text", + ) def test_conflict_on_new(self): with self.subTest(i=1): rollback = [] pid = str(uuid4()) - with self.assertRaises(EngineException, msg="Accepted uuid as project name") as e: + with self.assertRaises( + EngineException, msg="Accepted uuid as project name" + ) as e: self.topic.new(rollback, self.fake_session, {"name": pid}) self.assertEqual(len(rollback), 0, "Wrong rollback length") - self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") - self.assertIn("project name '{}' cannot have an uuid format".format(pid), - norm(str(e.exception)), "Wrong exception text") + self.assertEqual( + e.exception.http_code, + HTTPStatus.UNPROCESSABLE_ENTITY, + "Wrong HTTP status code", + ) + self.assertIn( + "project name '{}' cannot have an uuid format".format(pid), + norm(str(e.exception)), + "Wrong exception text", + ) with self.subTest(i=2): rollback = [] - self.auth.get_project_list.return_value = [{"_id": test_pid, "name": self.test_name}] - with self.assertRaises(EngineException, msg="Accepted existing project name") as e: + self.auth.get_project_list.return_value = [ + {"_id": test_pid, "name": self.test_name} + ] + with self.assertRaises( + EngineException, msg="Accepted existing project name" + ) as e: self.topic.new(rollback, self.fake_session, {"name": self.test_name}) self.assertEqual(len(rollback), 0, "Wrong rollback length") - self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code") - self.assertIn("project '{}' exists".format(self.test_name), - norm(str(e.exception)), "Wrong exception text") + self.assertEqual( + e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code" + ) + self.assertIn( + "project '{}' exists".format(self.test_name), + norm(str(e.exception)), + "Wrong exception text", + ) def test_conflict_on_edit(self): with self.subTest(i=1): - self.auth.get_project_list.return_value = [{"_id": test_pid, "name": self.test_name}] + self.auth.get_project_list.return_value = [ + {"_id": test_pid, "name": self.test_name} + ] new_name = str(uuid4()) - with self.assertRaises(EngineException, msg="Accepted uuid as project name") as e: + with self.assertRaises( + EngineException, msg="Accepted uuid as project name" + ) as e: self.topic.edit(self.fake_session, test_pid, {"name": new_name}) - self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") - self.assertIn("project name '{}' cannot have an uuid format".format(new_name), - norm(str(e.exception)), "Wrong exception text") + self.assertEqual( + e.exception.http_code, + HTTPStatus.UNPROCESSABLE_ENTITY, + "Wrong HTTP status code", + ) + self.assertIn( + "project name '{}' cannot have an uuid format".format(new_name), + norm(str(e.exception)), + "Wrong exception text", + ) with self.subTest(i=2): pid = str(uuid4()) self.auth.get_project_list.return_value = [{"_id": pid, "name": "admin"}] - with self.assertRaises(EngineException, msg="Accepted renaming of project 'admin'") as e: + with self.assertRaises( + EngineException, msg="Accepted renaming of project 'admin'" + ) as e: self.topic.edit(self.fake_session, pid, {"name": "new-name"}) - self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code") - self.assertIn("you cannot rename project 'admin'", - norm(str(e.exception)), "Wrong exception text") + self.assertEqual( + e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code" + ) + self.assertIn( + "you cannot rename project 'admin'", + norm(str(e.exception)), + "Wrong exception text", + ) with self.subTest(i=3): new_name = "new-project-name" - self.auth.get_project_list.side_effect = [[{"_id": test_pid, "name": self.test_name}], - [{"_id": str(uuid4()), "name": new_name}]] - with self.assertRaises(EngineException, msg="Accepted existing project name") as e: + self.auth.get_project_list.side_effect = [ + [{"_id": test_pid, "name": self.test_name}], + [{"_id": str(uuid4()), "name": new_name}], + ] + with self.assertRaises( + EngineException, msg="Accepted existing project name" + ) as e: self.topic.edit(self.fake_session, pid, {"name": new_name}) - self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code") - self.assertIn("project '{}' is already used".format(new_name), - norm(str(e.exception)), "Wrong exception text") + self.assertEqual( + e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code" + ) + self.assertIn( + "project '{}' is already used".format(new_name), + norm(str(e.exception)), + "Wrong exception text", + ) def test_delete_project(self): with self.subTest(i=1): pid = str(uuid4()) - self.auth.get_project.return_value = {"_id": pid, "name": "other-project-name"} + self.auth.get_project.return_value = { + "_id": pid, + "name": "other-project-name", + } self.auth.delete_project.return_value = {"deleted": 1} self.auth.get_user_list.return_value = [] self.db.get_list.return_value = [] rc = self.topic.delete(self.fake_session, pid) self.assertEqual(rc, {"deleted": 1}, "Wrong project deletion return info") - self.assertEqual(self.auth.get_project.call_args[0][0], pid, "Wrong project identifier") - self.assertEqual(self.auth.delete_project.call_args[0][0], pid, "Wrong project identifier") + self.assertEqual( + self.auth.get_project.call_args[0][0], pid, "Wrong project identifier" + ) + self.assertEqual( + self.auth.delete_project.call_args[0][0], + pid, + "Wrong project identifier", + ) def test_conflict_on_del(self): with self.subTest(i=1): - self.auth.get_project.return_value = {"_id": test_pid, "name": self.test_name} - with self.assertRaises(EngineException, msg="Accepted deletion of own project") as e: + self.auth.get_project.return_value = { + "_id": test_pid, + "name": self.test_name, + } + with self.assertRaises( + EngineException, msg="Accepted deletion of own project" + ) as e: self.topic.delete(self.fake_session, self.test_name) - self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code") - self.assertIn("you cannot delete your own project", norm(str(e.exception)), "Wrong exception text") + self.assertEqual( + e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code" + ) + self.assertIn( + "you cannot delete your own project", + norm(str(e.exception)), + "Wrong exception text", + ) with self.subTest(i=2): self.auth.get_project.return_value = {"_id": str(uuid4()), "name": "admin"} - with self.assertRaises(EngineException, msg="Accepted deletion of project 'admin'") as e: + with self.assertRaises( + EngineException, msg="Accepted deletion of project 'admin'" + ) as e: self.topic.delete(self.fake_session, "admin") - self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code") - self.assertIn("you cannot delete project 'admin'", norm(str(e.exception)), "Wrong exception text") + self.assertEqual( + e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code" + ) + self.assertIn( + "you cannot delete project 'admin'", + norm(str(e.exception)), + "Wrong exception text", + ) with self.subTest(i=3): pid = str(uuid4()) name = "other-project-name" self.auth.get_project.return_value = {"_id": pid, "name": name} - self.auth.get_user_list.return_value = [{"_id": str(uuid4()), "username": self.test_name, - "project_role_mappings": [{"project": pid, "role": str(uuid4())}]}] - with self.assertRaises(EngineException, msg="Accepted deletion of used project") as e: + self.auth.get_user_list.return_value = [ + { + "_id": str(uuid4()), + "username": self.test_name, + "project_role_mappings": [{"project": pid, "role": str(uuid4())}], + } + ] + with self.assertRaises( + EngineException, msg="Accepted deletion of used project" + ) as e: self.topic.delete(self.fake_session, pid) - self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code") - self.assertIn("project '{}' ({}) is being used by user '{}'".format(name, pid, self.test_name), - norm(str(e.exception)), "Wrong exception text") + self.assertEqual( + e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code" + ) + self.assertIn( + "project '{}' ({}) is being used by user '{}'".format( + name, pid, self.test_name + ), + norm(str(e.exception)), + "Wrong exception text", + ) with self.subTest(i=4): self.auth.get_user_list.return_value = [] - self.db.get_list.return_value = [{"_id": str(uuid4()), "id": self.test_name, - "_admin": {"projects_read": [pid], "projects_write": []}}] - with self.assertRaises(EngineException, msg="Accepted deletion of used project") as e: + self.db.get_list.return_value = [ + { + "_id": str(uuid4()), + "id": self.test_name, + "_admin": {"projects_read": [pid], "projects_write": []}, + } + ] + with self.assertRaises( + EngineException, msg="Accepted deletion of used project" + ) as e: self.topic.delete(self.fake_session, pid) - self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code") - self.assertIn("project '{}' ({}) is being used by {} '{}'" - .format(name, pid, "vnf descriptor", self.test_name), - norm(str(e.exception)), "Wrong exception text") + self.assertEqual( + e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code" + ) + self.assertIn( + "project '{}' ({}) is being used by {} '{}'".format( + name, pid, "vnf descriptor", self.test_name + ), + norm(str(e.exception)), + "Wrong exception text", + ) class Test_RoleTopicAuth(TestCase): - @classmethod def setUpClass(cls): cls.test_name = "test-role-topic" @@ -339,8 +482,15 @@ class Test_RoleTopicAuth(TestCase): self.auth = Mock(authconn.Authconn(None, None, None)) self.auth.role_permissions = self.test_operations self.topic = RoleTopicAuth(self.db, self.fs, self.msg, self.auth) - self.fake_session = {"username": test_name, "project_id": (test_pid,), "method": None, - "admin": True, "force": False, "public": False, "allow_show_user_project_role": True} + self.fake_session = { + "username": test_name, + "project_id": (test_pid,), + "method": None, + "admin": True, + "force": False, + "public": False, + "allow_show_user_project_role": True, + } self.topic.check_quota = Mock(return_value=None) # skip quota def test_new_role(self): @@ -351,51 +501,95 @@ class Test_RoleTopicAuth(TestCase): perms_out = {"default": False, "admin": False, "tokens": True} self.auth.get_role_list.return_value = [] self.auth.create_role.return_value = rid1 - rid2, oid = self.topic.new(rollback, self.fake_session, {"name": self.test_name, "permissions": perms_in}) + rid2, oid = self.topic.new( + rollback, + self.fake_session, + {"name": self.test_name, "permissions": perms_in}, + ) self.assertEqual(len(rollback), 1, "Wrong rollback length") self.assertEqual(rid2, rid1, "Wrong project identifier") content = self.auth.create_role.call_args[0][0] self.assertEqual(content["name"], self.test_name, "Wrong role name") self.assertEqual(content["permissions"], perms_out, "Wrong permissions") self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time") - self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time") + self.assertEqual( + content["_admin"]["modified"], + content["_admin"]["created"], + "Wrong modification time", + ) with self.subTest(i=2): rollback = [] - with self.assertRaises(EngineException, msg="Accepted wrong permissions") as e: - self.topic.new(rollback, self.fake_session, - {"name": "other-role-name", "permissions": {"projects": True}}) + with self.assertRaises( + EngineException, msg="Accepted wrong permissions" + ) as e: + self.topic.new( + rollback, + self.fake_session, + {"name": "other-role-name", "permissions": {"projects": True}}, + ) self.assertEqual(len(rollback), 0, "Wrong rollback length") - self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") - self.assertIn("invalid permission '{}'".format("projects"), - norm(str(e.exception)), "Wrong exception text") + self.assertEqual( + e.exception.http_code, + HTTPStatus.UNPROCESSABLE_ENTITY, + "Wrong HTTP status code", + ) + self.assertIn( + "invalid permission '{}'".format("projects"), + norm(str(e.exception)), + "Wrong exception text", + ) def test_edit_role(self): now = time() rid = str(uuid4()) - role = {"_id": rid, "name": self.test_name, "permissions": {"tokens": True}, - "_admin": {"created": now, "modified": now}} + role = { + "_id": rid, + "name": self.test_name, + "permissions": {"tokens": True}, + "_admin": {"created": now, "modified": now}, + } with self.subTest(i=1): self.auth.get_role_list.side_effect = [[role], []] self.auth.get_role.return_value = role new_name = "new-role-name" perms_in = {"tokens": False, "tokens:get": True} - perms_out = {"default": False, "admin": False, "tokens": False, "tokens:get": True} - self.topic.edit(self.fake_session, rid, {"name": new_name, "permissions": perms_in}) + perms_out = { + "default": False, + "admin": False, + "tokens": False, + "tokens:get": True, + } + self.topic.edit( + self.fake_session, rid, {"name": new_name, "permissions": perms_in} + ) content = self.auth.update_role.call_args[0][0] self.assertEqual(content["_id"], rid, "Wrong role identifier") self.assertEqual(content["_admin"]["created"], now, "Wrong creation time") - self.assertGreater(content["_admin"]["modified"], now, "Wrong modification time") + self.assertGreater( + content["_admin"]["modified"], now, "Wrong modification time" + ) self.assertEqual(content["name"], new_name, "Wrong role name") self.assertEqual(content["permissions"], perms_out, "Wrong permissions") with self.subTest(i=2): new_name = "other-role-name" perms_in = {"tokens": False, "tokens:post": True} self.auth.get_role_list.side_effect = [[role], []] - with self.assertRaises(EngineException, msg="Accepted wrong permissions") as e: - self.topic.edit(self.fake_session, rid, {"name": new_name, "permissions": perms_in}) - self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") - self.assertIn("invalid permission '{}'".format("tokens:post"), - norm(str(e.exception)), "Wrong exception text") + with self.assertRaises( + EngineException, msg="Accepted wrong permissions" + ) as e: + self.topic.edit( + self.fake_session, rid, {"name": new_name, "permissions": perms_in} + ) + self.assertEqual( + e.exception.http_code, + HTTPStatus.UNPROCESSABLE_ENTITY, + "Wrong HTTP status code", + ) + self.assertIn( + "invalid permission '{}'".format("tokens:post"), + norm(str(e.exception)), + "Wrong exception text", + ) def test_delete_role(self): with self.subTest(i=1): @@ -407,59 +601,123 @@ class Test_RoleTopicAuth(TestCase): self.auth.get_user_list.return_value = [] rc = self.topic.delete(self.fake_session, rid) self.assertEqual(rc, {"deleted": 1}, "Wrong role deletion return info") - self.assertEqual(self.auth.get_role_list.call_args[0][0]["_id"], rid, "Wrong role identifier") - self.assertEqual(self.auth.get_role.call_args[0][0], rid, "Wrong role identifier") - self.assertEqual(self.auth.delete_role.call_args[0][0], rid, "Wrong role identifier") + self.assertEqual( + self.auth.get_role_list.call_args[0][0]["_id"], + rid, + "Wrong role identifier", + ) + self.assertEqual( + self.auth.get_role.call_args[0][0], rid, "Wrong role identifier" + ) + self.assertEqual( + self.auth.delete_role.call_args[0][0], rid, "Wrong role identifier" + ) def test_conflict_on_new(self): with self.subTest(i=1): rollback = [] rid = str(uuid4()) - with self.assertRaises(EngineException, msg="Accepted uuid as role name") as e: + with self.assertRaises( + EngineException, msg="Accepted uuid as role name" + ) as e: self.topic.new(rollback, self.fake_session, {"name": rid}) self.assertEqual(len(rollback), 0, "Wrong rollback length") - self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") - self.assertIn("role name '{}' cannot have an uuid format".format(rid), - norm(str(e.exception)), "Wrong exception text") + self.assertEqual( + e.exception.http_code, + HTTPStatus.UNPROCESSABLE_ENTITY, + "Wrong HTTP status code", + ) + self.assertIn( + "role name '{}' cannot have an uuid format".format(rid), + norm(str(e.exception)), + "Wrong exception text", + ) with self.subTest(i=2): rollback = [] - self.auth.get_role_list.return_value = [{"_id": str(uuid4()), "name": self.test_name}] - with self.assertRaises(EngineException, msg="Accepted existing role name") as e: + self.auth.get_role_list.return_value = [ + {"_id": str(uuid4()), "name": self.test_name} + ] + with self.assertRaises( + EngineException, msg="Accepted existing role name" + ) as e: self.topic.new(rollback, self.fake_session, {"name": self.test_name}) self.assertEqual(len(rollback), 0, "Wrong rollback length") - self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code") - self.assertIn("role name '{}' exists".format(self.test_name), - norm(str(e.exception)), "Wrong exception text") + self.assertEqual( + e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code" + ) + self.assertIn( + "role name '{}' exists".format(self.test_name), + norm(str(e.exception)), + "Wrong exception text", + ) def test_conflict_on_edit(self): rid = str(uuid4()) with self.subTest(i=1): - self.auth.get_role_list.return_value = [{"_id": rid, "name": self.test_name, "permissions": {}}] + self.auth.get_role_list.return_value = [ + {"_id": rid, "name": self.test_name, "permissions": {}} + ] new_name = str(uuid4()) - with self.assertRaises(EngineException, msg="Accepted uuid as role name") as e: + with self.assertRaises( + EngineException, msg="Accepted uuid as role name" + ) as e: self.topic.edit(self.fake_session, rid, {"name": new_name}) - self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") - self.assertIn("role name '{}' cannot have an uuid format".format(new_name), - norm(str(e.exception)), "Wrong exception text") + self.assertEqual( + e.exception.http_code, + HTTPStatus.UNPROCESSABLE_ENTITY, + "Wrong HTTP status code", + ) + self.assertIn( + "role name '{}' cannot have an uuid format".format(new_name), + norm(str(e.exception)), + "Wrong exception text", + ) for i, role_name in enumerate(["system_admin", "project_admin"], start=2): with self.subTest(i=i): rid = str(uuid4()) - self.auth.get_role.return_value = {"_id": rid, "name": role_name, "permissions": {}} - with self.assertRaises(EngineException, msg="Accepted renaming of role '{}'".format(role_name)) as e: + self.auth.get_role.return_value = { + "_id": rid, + "name": role_name, + "permissions": {}, + } + with self.assertRaises( + EngineException, + msg="Accepted renaming of role '{}'".format(role_name), + ) as e: self.topic.edit(self.fake_session, rid, {"name": "new-name"}) - self.assertEqual(e.exception.http_code, HTTPStatus.FORBIDDEN, "Wrong HTTP status code") - self.assertIn("you cannot rename role '{}'".format(role_name), - norm(str(e.exception)), "Wrong exception text") - with self.subTest(i=i+1): + self.assertEqual( + e.exception.http_code, + HTTPStatus.FORBIDDEN, + "Wrong HTTP status code", + ) + self.assertIn( + "you cannot rename role '{}'".format(role_name), + norm(str(e.exception)), + "Wrong exception text", + ) + with self.subTest(i=i + 1): new_name = "new-role-name" - self.auth.get_role_list.side_effect = [[{"_id": rid, "name": self.test_name, "permissions": {}}], - [{"_id": str(uuid4()), "name": new_name, "permissions": {}}]] - self.auth.get_role.return_value = {"_id": rid, "name": self.test_name, "permissions": {}} - with self.assertRaises(EngineException, msg="Accepted existing role name") as e: + self.auth.get_role_list.side_effect = [ + [{"_id": rid, "name": self.test_name, "permissions": {}}], + [{"_id": str(uuid4()), "name": new_name, "permissions": {}}], + ] + self.auth.get_role.return_value = { + "_id": rid, + "name": self.test_name, + "permissions": {}, + } + with self.assertRaises( + EngineException, msg="Accepted existing role name" + ) as e: self.topic.edit(self.fake_session, rid, {"name": new_name}) - self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code") - self.assertIn("role name '{}' exists".format(new_name), - norm(str(e.exception)), "Wrong exception text") + self.assertEqual( + e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code" + ) + self.assertIn( + "role name '{}' exists".format(new_name), + norm(str(e.exception)), + "Wrong exception text", + ) def test_conflict_on_del(self): for i, role_name in enumerate(["system_admin", "project_admin"], start=1): @@ -468,28 +726,51 @@ class Test_RoleTopicAuth(TestCase): role = {"_id": rid, "name": role_name} self.auth.get_role_list.return_value = [role] self.auth.get_role.return_value = role - with self.assertRaises(EngineException, msg="Accepted deletion of role '{}'".format(role_name)) as e: + with self.assertRaises( + EngineException, + msg="Accepted deletion of role '{}'".format(role_name), + ) as e: self.topic.delete(self.fake_session, rid) - self.assertEqual(e.exception.http_code, HTTPStatus.FORBIDDEN, "Wrong HTTP status code") - self.assertIn("you cannot delete role '{}'".format(role_name), - norm(str(e.exception)), "Wrong exception text") - with self.subTest(i=i+1): + self.assertEqual( + e.exception.http_code, + HTTPStatus.FORBIDDEN, + "Wrong HTTP status code", + ) + self.assertIn( + "you cannot delete role '{}'".format(role_name), + norm(str(e.exception)), + "Wrong exception text", + ) + with self.subTest(i=i + 1): rid = str(uuid4()) name = "other-role-name" role = {"_id": rid, "name": name} self.auth.get_role_list.return_value = [role] self.auth.get_role.return_value = role - self.auth.get_user_list.return_value = [{"_id": str(uuid4()), "username": self.test_name, - "project_role_mappings": [{"project": str(uuid4()), "role": rid}]}] - with self.assertRaises(EngineException, msg="Accepted deletion of used role") as e: + self.auth.get_user_list.return_value = [ + { + "_id": str(uuid4()), + "username": self.test_name, + "project_role_mappings": [{"project": str(uuid4()), "role": rid}], + } + ] + with self.assertRaises( + EngineException, msg="Accepted deletion of used role" + ) as e: self.topic.delete(self.fake_session, rid) - self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code") - self.assertIn("role '{}' ({}) is being used by user '{}'".format(name, rid, self.test_name), - norm(str(e.exception)), "Wrong exception text") + self.assertEqual( + e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code" + ) + self.assertIn( + "role '{}' ({}) is being used by user '{}'".format( + name, rid, self.test_name + ), + norm(str(e.exception)), + "Wrong exception text", + ) class Test_UserTopicAuth(TestCase): - @classmethod def setUpClass(cls): cls.test_name = "test-user-topic" @@ -500,8 +781,15 @@ class Test_UserTopicAuth(TestCase): self.msg = Mock(msgbase.MsgBase()) self.auth = Mock(authconn.Authconn(None, None, None)) self.topic = UserTopicAuth(self.db, self.fs, self.msg, self.auth) - self.fake_session = {"username": test_name, "project_id": (test_pid,), "method": None, - "admin": True, "force": False, "public": False, "allow_show_user_project_role": True} + self.fake_session = { + "username": test_name, + "project_id": (test_pid,), + "method": None, + "admin": True, + "force": False, + "public": False, + "allow_show_user_project_role": True, + } self.topic.check_quota = Mock(return_value=None) # skip quota def test_new_user(self): @@ -516,18 +804,31 @@ class Test_UserTopicAuth(TestCase): self.auth.get_role.return_value = {"_id": rid, "name": "some_role"} prms_in = [{"project": "some_project", "role": "some_role"}] prms_out = [{"project": pid, "role": rid}] - uid2, oid = self.topic.new(rollback, self.fake_session, {"username": self.test_name, - "password": self.test_name, - "project_role_mappings": prms_in - }) + uid2, oid = self.topic.new( + rollback, + self.fake_session, + { + "username": self.test_name, + "password": self.test_name, + "project_role_mappings": prms_in, + }, + ) self.assertEqual(len(rollback), 1, "Wrong rollback length") self.assertEqual(uid2, uid1, "Wrong project identifier") content = self.auth.create_user.call_args[0][0] self.assertEqual(content["username"], self.test_name, "Wrong project name") self.assertEqual(content["password"], self.test_name, "Wrong password") - self.assertEqual(content["project_role_mappings"], prms_out, "Wrong project-role mappings") + self.assertEqual( + content["project_role_mappings"], + prms_out, + "Wrong project-role mappings", + ) self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time") - self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time") + self.assertEqual( + content["_admin"]["modified"], + content["_admin"]["created"], + "Wrong modification time", + ) with self.subTest(i=2): rollback = [] def_rid = str(uuid4()) @@ -535,184 +836,380 @@ class Test_UserTopicAuth(TestCase): self.auth.get_role.return_value = def_role self.auth.get_role_list.return_value = [def_role] prms_out = [{"project": pid, "role": def_rid}] - uid2, oid = self.topic.new(rollback, self.fake_session, {"username": self.test_name, - "password": self.test_name, - "projects": ["some_project"] - }) + uid2, oid = self.topic.new( + rollback, + self.fake_session, + { + "username": self.test_name, + "password": self.test_name, + "projects": ["some_project"], + }, + ) self.assertEqual(len(rollback), 1, "Wrong rollback length") self.assertEqual(uid2, uid1, "Wrong project identifier") content = self.auth.create_user.call_args[0][0] self.assertEqual(content["username"], self.test_name, "Wrong project name") self.assertEqual(content["password"], self.test_name, "Wrong password") - self.assertEqual(content["project_role_mappings"], prms_out, "Wrong project-role mappings") + self.assertEqual( + content["project_role_mappings"], + prms_out, + "Wrong project-role mappings", + ) self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time") - self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time") + self.assertEqual( + content["_admin"]["modified"], + content["_admin"]["created"], + "Wrong modification time", + ) with self.subTest(i=3): rollback = [] - with self.assertRaises(EngineException, msg="Accepted wrong project-role mappings") as e: - self.topic.new(rollback, self.fake_session, {"username": "other-project-name", - "password": "other-password", - "project_role_mappings": [{}] - }) + with self.assertRaises( + EngineException, msg="Accepted wrong project-role mappings" + ) as e: + self.topic.new( + rollback, + self.fake_session, + { + "username": "other-project-name", + "password": "other-password", + "project_role_mappings": [{}], + }, + ) self.assertEqual(len(rollback), 0, "Wrong rollback length") - self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") - self.assertIn("format error at '{}' '{}'" - .format("project_role_mappings:{}", "'{}' is a required property").format(0, "project"), - norm(str(e.exception)), "Wrong exception text") + self.assertEqual( + e.exception.http_code, + HTTPStatus.UNPROCESSABLE_ENTITY, + "Wrong HTTP status code", + ) + self.assertIn( + "format error at '{}' '{}'".format( + "project_role_mappings:{}", "'{}' is a required property" + ).format(0, "project"), + norm(str(e.exception)), + "Wrong exception text", + ) with self.subTest(i=4): rollback = [] with self.assertRaises(EngineException, msg="Accepted wrong projects") as e: - self.topic.new(rollback, self.fake_session, {"username": "other-project-name", - "password": "other-password", - "projects": [] - }) + self.topic.new( + rollback, + self.fake_session, + { + "username": "other-project-name", + "password": "other-password", + "projects": [], + }, + ) self.assertEqual(len(rollback), 0, "Wrong rollback length") - self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") - self.assertIn("format error at '{}' '{}'" .format("projects", "{} is too short").format([]), - norm(str(e.exception)), "Wrong exception text") + self.assertEqual( + e.exception.http_code, + HTTPStatus.UNPROCESSABLE_ENTITY, + "Wrong HTTP status code", + ) + self.assertIn( + "format error at '{}' '{}'".format( + "projects", "{} is too short" + ).format([]), + norm(str(e.exception)), + "Wrong exception text", + ) def test_edit_user(self): now = time() uid = str(uuid4()) pid1 = str(uuid4()) rid1 = str(uuid4()) - prms = [{"project": pid1, "project_name": "project-1", "role": rid1, "role_name": "role-1"}] - user = {"_id": uid, "username": self.test_name, "project_role_mappings": prms, - "_admin": {"created": now, "modified": now}} + prms = [ + { + "project": pid1, + "project_name": "project-1", + "role": rid1, + "role_name": "role-1", + } + ] + user = { + "_id": uid, + "username": self.test_name, + "project_role_mappings": prms, + "_admin": {"created": now, "modified": now}, + } with self.subTest(i=1): self.auth.get_user_list.side_effect = [[user], []] self.auth.get_user.return_value = user pid2 = str(uuid4()) rid2 = str(uuid4()) - self.auth.get_project.side_effect = [{"_id": pid2, "name": "project-2"}, - {"_id": pid1, "name": "project-1"}] - self.auth.get_role.side_effect = [{"_id": rid2, "name": "role-2"}, - {"_id": rid1, "name": "role-1"}] + self.auth.get_project.side_effect = [ + {"_id": pid2, "name": "project-2"}, + {"_id": pid1, "name": "project-1"}, + ] + self.auth.get_role.side_effect = [ + {"_id": rid2, "name": "role-2"}, + {"_id": rid1, "name": "role-1"}, + ] new_name = "new-user-name" new_pasw = "new-password" add_prms = [{"project": pid2, "role": rid2}] rem_prms = [{"project": pid1, "role": rid1}] - self.topic.edit(self.fake_session, uid, {"username": new_name, "password": new_pasw, - "add_project_role_mappings": add_prms, - "remove_project_role_mappings": rem_prms - }) + self.topic.edit( + self.fake_session, + uid, + { + "username": new_name, + "password": new_pasw, + "add_project_role_mappings": add_prms, + "remove_project_role_mappings": rem_prms, + }, + ) content = self.auth.update_user.call_args[0][0] self.assertEqual(content["_id"], uid, "Wrong user identifier") self.assertEqual(content["username"], new_name, "Wrong user name") self.assertEqual(content["password"], new_pasw, "Wrong user password") - self.assertEqual(content["add_project_role_mappings"], add_prms, "Wrong project-role mappings to add") - self.assertEqual(content["remove_project_role_mappings"], prms, "Wrong project-role mappings to remove") + self.assertEqual( + content["add_project_role_mappings"], + add_prms, + "Wrong project-role mappings to add", + ) + self.assertEqual( + content["remove_project_role_mappings"], + prms, + "Wrong project-role mappings to remove", + ) with self.subTest(i=2): new_name = "other-user-name" new_prms = [{}] self.auth.get_role_list.side_effect = [[user], []] self.auth.get_user_list.side_effect = [[user]] - with self.assertRaises(EngineException, msg="Accepted wrong project-role mappings") as e: - self.topic.edit(self.fake_session, uid, {"username": new_name, "project_role_mappings": new_prms}) - self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") - self.assertIn("format error at '{}' '{}'" - .format("project_role_mappings:{}", "'{}' is a required property").format(0, "project"), - norm(str(e.exception)), "Wrong exception text") + with self.assertRaises( + EngineException, msg="Accepted wrong project-role mappings" + ) as e: + self.topic.edit( + self.fake_session, + uid, + {"username": new_name, "project_role_mappings": new_prms}, + ) + self.assertEqual( + e.exception.http_code, + HTTPStatus.UNPROCESSABLE_ENTITY, + "Wrong HTTP status code", + ) + self.assertIn( + "format error at '{}' '{}'".format( + "project_role_mappings:{}", "'{}' is a required property" + ).format(0, "project"), + norm(str(e.exception)), + "Wrong exception text", + ) def test_delete_user(self): with self.subTest(i=1): uid = str(uuid4()) self.fake_session["username"] = self.test_name - user = user = {"_id": uid, "username": "other-user-name", "project_role_mappings": []} + user = user = { + "_id": uid, + "username": "other-user-name", + "project_role_mappings": [], + } self.auth.get_user.return_value = user self.auth.delete_user.return_value = {"deleted": 1} rc = self.topic.delete(self.fake_session, uid) self.assertEqual(rc, {"deleted": 1}, "Wrong user deletion return info") - self.assertEqual(self.auth.get_user.call_args[0][0], uid, "Wrong user identifier") - self.assertEqual(self.auth.delete_user.call_args[0][0], uid, "Wrong user identifier") + self.assertEqual( + self.auth.get_user.call_args[0][0], uid, "Wrong user identifier" + ) + self.assertEqual( + self.auth.delete_user.call_args[0][0], uid, "Wrong user identifier" + ) def test_conflict_on_new(self): with self.subTest(i=1): rollback = [] uid = str(uuid4()) - with self.assertRaises(EngineException, msg="Accepted uuid as username") as e: - self.topic.new(rollback, self.fake_session, {"username": uid, "password": self.test_name, - "projects": [test_pid]}) + with self.assertRaises( + EngineException, msg="Accepted uuid as username" + ) as e: + self.topic.new( + rollback, + self.fake_session, + { + "username": uid, + "password": self.test_name, + "projects": [test_pid], + }, + ) self.assertEqual(len(rollback), 0, "Wrong rollback length") - self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") - self.assertIn("username '{}' cannot have a uuid format".format(uid), - norm(str(e.exception)), "Wrong exception text") + self.assertEqual( + e.exception.http_code, + HTTPStatus.UNPROCESSABLE_ENTITY, + "Wrong HTTP status code", + ) + self.assertIn( + "username '{}' cannot have a uuid format".format(uid), + norm(str(e.exception)), + "Wrong exception text", + ) with self.subTest(i=2): rollback = [] - self.auth.get_user_list.return_value = [{"_id": str(uuid4()), "username": self.test_name}] - with self.assertRaises(EngineException, msg="Accepted existing username") as e: - self.topic.new(rollback, self.fake_session, {"username": self.test_name, "password": self.test_name, - "projects": [test_pid]}) + self.auth.get_user_list.return_value = [ + {"_id": str(uuid4()), "username": self.test_name} + ] + with self.assertRaises( + EngineException, msg="Accepted existing username" + ) as e: + self.topic.new( + rollback, + self.fake_session, + { + "username": self.test_name, + "password": self.test_name, + "projects": [test_pid], + }, + ) self.assertEqual(len(rollback), 0, "Wrong rollback length") - self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code") - self.assertIn("username '{}' is already used".format(self.test_name), - norm(str(e.exception)), "Wrong exception text") + self.assertEqual( + e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code" + ) + self.assertIn( + "username '{}' is already used".format(self.test_name), + norm(str(e.exception)), + "Wrong exception text", + ) with self.subTest(i=3): rollback = [] self.auth.get_user_list.return_value = [] self.auth.get_role_list.side_effect = [[], []] - with self.assertRaises(AuthconnNotFoundException, msg="Accepted user without default role") as e: - self.topic.new(rollback, self.fake_session, {"username": self.test_name, "password": self.test_name, - "projects": [str(uuid4())]}) + with self.assertRaises( + AuthconnNotFoundException, msg="Accepted user without default role" + ) as e: + self.topic.new( + rollback, + self.fake_session, + { + "username": self.test_name, + "password": self.test_name, + "projects": [str(uuid4())], + }, + ) self.assertEqual(len(rollback), 0, "Wrong rollback length") - self.assertEqual(e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code") - self.assertIn("can't find default role for user '{}'".format(self.test_name), - norm(str(e.exception)), "Wrong exception text") + self.assertEqual( + e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code" + ) + self.assertIn( + "can't find default role for user '{}'".format(self.test_name), + norm(str(e.exception)), + "Wrong exception text", + ) def test_conflict_on_edit(self): uid = str(uuid4()) with self.subTest(i=1): - self.auth.get_user_list.return_value = [{"_id": uid, "username": self.test_name}] + self.auth.get_user_list.return_value = [ + {"_id": uid, "username": self.test_name} + ] new_name = str(uuid4()) - with self.assertRaises(EngineException, msg="Accepted uuid as username") as e: + with self.assertRaises( + EngineException, msg="Accepted uuid as username" + ) as e: self.topic.edit(self.fake_session, uid, {"username": new_name}) - self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") - self.assertIn("username '{}' cannot have an uuid format".format(new_name), - norm(str(e.exception)), "Wrong exception text") + self.assertEqual( + e.exception.http_code, + HTTPStatus.UNPROCESSABLE_ENTITY, + "Wrong HTTP status code", + ) + self.assertIn( + "username '{}' cannot have an uuid format".format(new_name), + norm(str(e.exception)), + "Wrong exception text", + ) with self.subTest(i=2): - self.auth.get_user_list.return_value = [{"_id": uid, "username": self.test_name}] + self.auth.get_user_list.return_value = [ + {"_id": uid, "username": self.test_name} + ] self.auth.get_role_list.side_effect = [[], []] - with self.assertRaises(AuthconnNotFoundException, msg="Accepted user without default role") as e: + with self.assertRaises( + AuthconnNotFoundException, msg="Accepted user without default role" + ) as e: self.topic.edit(self.fake_session, uid, {"projects": [str(uuid4())]}) - self.assertEqual(e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code") - self.assertIn("can't find a default role for user '{}'".format(self.test_name), - norm(str(e.exception)), "Wrong exception text") + self.assertEqual( + e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code" + ) + self.assertIn( + "can't find a default role for user '{}'".format(self.test_name), + norm(str(e.exception)), + "Wrong exception text", + ) with self.subTest(i=3): admin_uid = str(uuid4()) - self.auth.get_user_list.return_value = [{"_id": admin_uid, "username": "admin"}] - with self.assertRaises(EngineException, msg="Accepted removing system_admin role from admin user") as e: - self.topic.edit(self.fake_session, admin_uid, - {"remove_project_role_mappings": [{"project": "admin", "role": "system_admin"}]}) - self.assertEqual(e.exception.http_code, HTTPStatus.FORBIDDEN, "Wrong HTTP status code") - self.assertIn("you cannot remove system_admin role from admin user", - norm(str(e.exception)), "Wrong exception text") + self.auth.get_user_list.return_value = [ + {"_id": admin_uid, "username": "admin"} + ] + with self.assertRaises( + EngineException, + msg="Accepted removing system_admin role from admin user", + ) as e: + self.topic.edit( + self.fake_session, + admin_uid, + { + "remove_project_role_mappings": [ + {"project": "admin", "role": "system_admin"} + ] + }, + ) + self.assertEqual( + e.exception.http_code, HTTPStatus.FORBIDDEN, "Wrong HTTP status code" + ) + self.assertIn( + "you cannot remove system_admin role from admin user", + norm(str(e.exception)), + "Wrong exception text", + ) with self.subTest(i=4): new_name = "new-user-name" - self.auth.get_user_list.side_effect = [[{"_id": uid, "name": self.test_name}], - [{"_id": str(uuid4()), "name": new_name}]] - with self.assertRaises(EngineException, msg="Accepted existing username") as e: + self.auth.get_user_list.side_effect = [ + [{"_id": uid, "name": self.test_name}], + [{"_id": str(uuid4()), "name": new_name}], + ] + with self.assertRaises( + EngineException, msg="Accepted existing username" + ) as e: self.topic.edit(self.fake_session, uid, {"username": new_name}) - self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code") - self.assertIn("username '{}' is already used".format(new_name), - norm(str(e.exception)), "Wrong exception text") + self.assertEqual( + e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code" + ) + self.assertIn( + "username '{}' is already used".format(new_name), + norm(str(e.exception)), + "Wrong exception text", + ) def test_conflict_on_del(self): with self.subTest(i=1): uid = str(uuid4()) self.fake_session["username"] = self.test_name - user = user = {"_id": uid, "username": self.test_name, "project_role_mappings": []} + user = user = { + "_id": uid, + "username": self.test_name, + "project_role_mappings": [], + } self.auth.get_user.return_value = user - with self.assertRaises(EngineException, msg="Accepted deletion of own user") as e: + with self.assertRaises( + EngineException, msg="Accepted deletion of own user" + ) as e: self.topic.delete(self.fake_session, uid) - self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code") - self.assertIn("you cannot delete your own login user", norm(str(e.exception)), "Wrong exception text") + self.assertEqual( + e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code" + ) + self.assertIn( + "you cannot delete your own login user", + norm(str(e.exception)), + "Wrong exception text", + ) class Test_CommonVimWimSdn(TestCase): - @classmethod def setUpClass(cls): - cls.test_name = "test-cim-topic" # CIM = Common Infrastructure Manager + cls.test_name = "test-cim-topic" # CIM = Common Infrastructure Manager def setUp(self): self.db = Mock(dbbase.DbBase()) @@ -725,8 +1222,15 @@ class Test_CommonVimWimSdn(TestCase): self.topic.topic = "wims" self.topic.schema_new = validation.wim_account_new_schema self.topic.schema_edit = validation.wim_account_edit_schema - self.fake_session = {"username": test_name, "project_id": (test_pid,), "method": None, - "admin": True, "force": False, "public": False, "allow_show_user_project_role": True} + self.fake_session = { + "username": test_name, + "project_id": (test_pid,), + "method": None, + "admin": True, + "force": False, + "public": False, + "allow_show_user_project_role": True, + } self.topic.check_quota = Mock(return_value=None) # skip quota def test_new_cvws(self): @@ -736,8 +1240,11 @@ class Test_CommonVimWimSdn(TestCase): test_type = "fake" self.db.get_one.return_value = None self.db.create.side_effect = lambda self, content: content["_id"] - cid, oid = self.topic.new(rollback, self.fake_session, - {"name": self.test_name, "wim_url": test_url, "wim_type": test_type}) + cid, oid = self.topic.new( + rollback, + self.fake_session, + {"name": self.test_name, "wim_url": test_url, "wim_type": test_type}, + ) self.assertEqual(len(rollback), 1, "Wrong rollback length") args = self.db.create.call_args[0] content = args[1] @@ -749,20 +1256,55 @@ class Test_CommonVimWimSdn(TestCase): self.assertEqual(content["schema_version"], "1.11", "Wrong schema version") self.assertEqual(content["op_id"], oid, "Wrong operation identifier") self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time") - self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time") - self.assertEqual(content["_admin"]["operationalState"], "PROCESSING", "Wrong operational state") - self.assertEqual(content["_admin"]["projects_read"], [test_pid], "Wrong read-only projects") - self.assertEqual(content["_admin"]["projects_write"], [test_pid], "Wrong read/write projects") - self.assertIsNone(content["_admin"]["current_operation"], "Wrong current operation") - self.assertEqual(len(content["_admin"]["operations"]), 1, "Wrong number of operations") + self.assertEqual( + content["_admin"]["modified"], + content["_admin"]["created"], + "Wrong modification time", + ) + self.assertEqual( + content["_admin"]["operationalState"], + "PROCESSING", + "Wrong operational state", + ) + self.assertEqual( + content["_admin"]["projects_read"], + [test_pid], + "Wrong read-only projects", + ) + self.assertEqual( + content["_admin"]["projects_write"], + [test_pid], + "Wrong read/write projects", + ) + self.assertIsNone( + content["_admin"]["current_operation"], "Wrong current operation" + ) + self.assertEqual( + len(content["_admin"]["operations"]), 1, "Wrong number of operations" + ) operation = content["_admin"]["operations"][0] - self.assertEqual(operation["lcmOperationType"], "create", "Wrong operation type") - self.assertEqual(operation["operationState"], "PROCESSING", "Wrong operation state") - self.assertGreater(operation["startTime"], content["_admin"]["created"], "Wrong operation start time") - self.assertGreater(operation["statusEnteredTime"], content["_admin"]["created"], - "Wrong operation status enter time") - self.assertEqual(operation["detailed-status"], "", "Wrong operation detailed status info") - self.assertIsNone(operation["operationParams"], "Wrong operation parameters") + self.assertEqual( + operation["lcmOperationType"], "create", "Wrong operation type" + ) + self.assertEqual( + operation["operationState"], "PROCESSING", "Wrong operation state" + ) + self.assertGreater( + operation["startTime"], + content["_admin"]["created"], + "Wrong operation start time", + ) + self.assertGreater( + operation["statusEnteredTime"], + content["_admin"]["created"], + "Wrong operation status enter time", + ) + self.assertEqual( + operation["detailed-status"], "", "Wrong operation detailed status info" + ) + self.assertIsNone( + operation["operationParams"], "Wrong operation parameters" + ) # This test is disabled. From Feature 8030 we admit all WIM/SDN types # with self.subTest(i=2): # rollback = [] @@ -781,21 +1323,46 @@ class Test_CommonVimWimSdn(TestCase): test_url = "http://0.0.0.0:0" test_type = "fake" self.db.get_one.return_value = {"_id": str(uuid4()), "name": self.test_name} - with self.assertRaises(EngineException, msg="Accepted existing CIM name") as e: - self.topic.new(rollback, self.fake_session, - {"name": self.test_name, "wim_url": test_url, "wim_type": test_type}) + with self.assertRaises( + EngineException, msg="Accepted existing CIM name" + ) as e: + self.topic.new( + rollback, + self.fake_session, + { + "name": self.test_name, + "wim_url": test_url, + "wim_type": test_type, + }, + ) self.assertEqual(len(rollback), 0, "Wrong rollback length") - self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code") - self.assertIn("name '{}' already exists for {}".format(self.test_name, self.topic.topic), - norm(str(e.exception)), "Wrong exception text") + self.assertEqual( + e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code" + ) + self.assertIn( + "name '{}' already exists for {}".format( + self.test_name, self.topic.topic + ), + norm(str(e.exception)), + "Wrong exception text", + ) def test_edit_cvws(self): now = time() cid = str(uuid4()) test_url = "http://0.0.0.0:0" test_type = "fake" - cvws = {"_id": cid, "name": self.test_name, "wim_url": test_url, "wim_type": test_type, - "_admin": {"created": now, "modified": now, "operations": [{"lcmOperationType": "create"}]}} + cvws = { + "_id": cid, + "name": self.test_name, + "wim_url": test_url, + "wim_type": test_type, + "_admin": { + "created": now, + "modified": now, + "operations": [{"lcmOperationType": "create"}], + }, + } with self.subTest(i=1): new_name = "new-cim-name" new_url = "https://1.1.1.1:1" @@ -803,7 +1370,11 @@ class Test_CommonVimWimSdn(TestCase): self.db.get_one.side_effect = [cvws, None] self.db.replace.return_value = {"updated": 1} # self.db.encrypt.side_effect = [b64str(), b64str()] - self.topic.edit(self.fake_session, cid, {"name": new_name, "wim_url": new_url, "wim_type": new_type}) + self.topic.edit( + self.fake_session, + cid, + {"name": new_name, "wim_url": new_url, "wim_type": new_type}, + ) args = self.db.replace.call_args[0] content = args[2] self.assertEqual(args[0], self.topic.topic, "Wrong topic") @@ -813,36 +1384,78 @@ class Test_CommonVimWimSdn(TestCase): self.assertEqual(content["wim_type"], new_type, "Wrong CIM type") self.assertEqual(content["wim_url"], new_url, "Wrong URL") self.assertEqual(content["_admin"]["created"], now, "Wrong creation time") - self.assertGreater(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time") - self.assertEqual(len(content["_admin"]["operations"]), 2, "Wrong number of operations") + self.assertGreater( + content["_admin"]["modified"], + content["_admin"]["created"], + "Wrong modification time", + ) + self.assertEqual( + len(content["_admin"]["operations"]), 2, "Wrong number of operations" + ) operation = content["_admin"]["operations"][1] - self.assertEqual(operation["lcmOperationType"], "edit", "Wrong operation type") - self.assertEqual(operation["operationState"], "PROCESSING", "Wrong operation state") - self.assertGreater(operation["startTime"], content["_admin"]["modified"], "Wrong operation start time") - self.assertGreater(operation["statusEnteredTime"], content["_admin"]["modified"], - "Wrong operation status enter time") - self.assertEqual(operation["detailed-status"], "", "Wrong operation detailed status info") - self.assertIsNone(operation["operationParams"], "Wrong operation parameters") + self.assertEqual( + operation["lcmOperationType"], "edit", "Wrong operation type" + ) + self.assertEqual( + operation["operationState"], "PROCESSING", "Wrong operation state" + ) + self.assertGreater( + operation["startTime"], + content["_admin"]["modified"], + "Wrong operation start time", + ) + self.assertGreater( + operation["statusEnteredTime"], + content["_admin"]["modified"], + "Wrong operation status enter time", + ) + self.assertEqual( + operation["detailed-status"], "", "Wrong operation detailed status info" + ) + self.assertIsNone( + operation["operationParams"], "Wrong operation parameters" + ) with self.subTest(i=2): self.db.get_one.side_effect = [cvws] with self.assertRaises(EngineException, msg="Accepted wrong property") as e: - self.topic.edit(self.fake_session, str(uuid4()), {"name": "new-name", "extra_prop": "anything"}) - self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") - self.assertIn("format error '{}'".format("additional properties are not allowed ('{}' was unexpected)"). - format("extra_prop"), - norm(str(e.exception)), "Wrong exception text") + self.topic.edit( + self.fake_session, + str(uuid4()), + {"name": "new-name", "extra_prop": "anything"}, + ) + self.assertEqual( + e.exception.http_code, + HTTPStatus.UNPROCESSABLE_ENTITY, + "Wrong HTTP status code", + ) + self.assertIn( + "format error '{}'".format( + "additional properties are not allowed ('{}' was unexpected)" + ).format("extra_prop"), + norm(str(e.exception)), + "Wrong exception text", + ) def test_conflict_on_edit(self): with self.subTest(i=1): cid = str(uuid4()) new_name = "new-cim-name" - self.db.get_one.side_effect = [{"_id": cid, "name": self.test_name}, - {"_id": str(uuid4()), "name": new_name}] - with self.assertRaises(EngineException, msg="Accepted existing CIM name") as e: + self.db.get_one.side_effect = [ + {"_id": cid, "name": self.test_name}, + {"_id": str(uuid4()), "name": new_name}, + ] + with self.assertRaises( + EngineException, msg="Accepted existing CIM name" + ) as e: self.topic.edit(self.fake_session, cid, {"name": new_name}) - self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code") - self.assertIn("name '{}' already exists for {}".format(new_name, self.topic.topic), - norm(str(e.exception)), "Wrong exception text") + self.assertEqual( + e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code" + ) + self.assertIn( + "name '{}' already exists for {}".format(new_name, self.topic.topic), + norm(str(e.exception)), + "Wrong exception text", + ) def test_delete_cvws(self): cid = str(uuid4()) @@ -851,57 +1464,119 @@ class Test_CommonVimWimSdn(TestCase): cvws = {"_id": cid, "name": self.test_name} self.db.get_list.return_value = [] with self.subTest(i=1): - cvws["_admin"] = {"projects_read": [test_pid, ro_pid, rw_pid], "projects_write": [test_pid, rw_pid]} + cvws["_admin"] = { + "projects_read": [test_pid, ro_pid, rw_pid], + "projects_write": [test_pid, rw_pid], + } self.db.get_one.return_value = cvws oid = self.topic.delete(self.fake_session, cid) self.assertIsNone(oid, "Wrong operation identifier") - self.assertEqual(self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic") - self.assertEqual(self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier") - self.assertEqual(self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic") - self.assertEqual(self.db.set_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier") - self.assertEqual(self.db.set_one.call_args[1]["update_dict"], None, - "Wrong read-only projects update") - self.assertEqual(self.db.set_one.call_args[1]["pull_list"], - {"_admin.projects_read": (test_pid,), "_admin.projects_write": (test_pid,)}, - "Wrong read/write projects update") + self.assertEqual( + self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic" + ) + self.assertEqual( + self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier" + ) + self.assertEqual( + self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic" + ) + self.assertEqual( + self.db.set_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier" + ) + self.assertEqual( + self.db.set_one.call_args[1]["update_dict"], + None, + "Wrong read-only projects update", + ) + self.assertEqual( + self.db.set_one.call_args[1]["pull_list"], + { + "_admin.projects_read": (test_pid,), + "_admin.projects_write": (test_pid,), + }, + "Wrong read/write projects update", + ) self.topic._send_msg.assert_not_called() with self.subTest(i=2): now = time() - cvws["_admin"] = {"projects_read": [test_pid], "projects_write": [test_pid], "operations": []} + cvws["_admin"] = { + "projects_read": [test_pid], + "projects_write": [test_pid], + "operations": [], + } self.db.get_one.return_value = cvws oid = self.topic.delete(self.fake_session, cid) - self.assertEqual(oid, cid+":0", "Wrong operation identifier") - self.assertEqual(self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic") - self.assertEqual(self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier") - self.assertEqual(self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic") - self.assertEqual(self.db.set_one.call_args[0][1]["_id"], cid, "Wrong user identifier") - self.assertEqual(self.db.set_one.call_args[1]["update_dict"], {"_admin.to_delete": True}, - "Wrong _admin.to_delete update") + self.assertEqual(oid, cid + ":0", "Wrong operation identifier") + self.assertEqual( + self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic" + ) + self.assertEqual( + self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier" + ) + self.assertEqual( + self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic" + ) + self.assertEqual( + self.db.set_one.call_args[0][1]["_id"], cid, "Wrong user identifier" + ) + self.assertEqual( + self.db.set_one.call_args[1]["update_dict"], + {"_admin.to_delete": True}, + "Wrong _admin.to_delete update", + ) operation = self.db.set_one.call_args[1]["push"]["_admin.operations"] - self.assertEqual(operation["lcmOperationType"], "delete", "Wrong operation type") - self.assertEqual(operation["operationState"], "PROCESSING", "Wrong operation state") - self.assertEqual(operation["detailed-status"], "", "Wrong operation detailed status") - self.assertIsNone(operation["operationParams"], "Wrong operation parameters") - self.assertGreater(operation["startTime"], now, "Wrong operation start time") - self.assertGreater(operation["statusEnteredTime"], now, "Wrong operation status enter time") - self.topic._send_msg.assert_called_once_with("delete", {"_id": cid, "op_id": cid + ":0"}, not_send_msg=None) + self.assertEqual( + operation["lcmOperationType"], "delete", "Wrong operation type" + ) + self.assertEqual( + operation["operationState"], "PROCESSING", "Wrong operation state" + ) + self.assertEqual( + operation["detailed-status"], "", "Wrong operation detailed status" + ) + self.assertIsNone( + operation["operationParams"], "Wrong operation parameters" + ) + self.assertGreater( + operation["startTime"], now, "Wrong operation start time" + ) + self.assertGreater( + operation["statusEnteredTime"], now, "Wrong operation status enter time" + ) + self.topic._send_msg.assert_called_once_with( + "delete", {"_id": cid, "op_id": cid + ":0"}, not_send_msg=None + ) with self.subTest(i=3): - cvws["_admin"] = {"projects_read": [], "projects_write": [], "operations": []} + cvws["_admin"] = { + "projects_read": [], + "projects_write": [], + "operations": [], + } self.db.get_one.return_value = cvws self.topic._send_msg.reset_mock() self.db.get_one.reset_mock() self.db.del_one.reset_mock() - self.fake_session["force"] = True # to force deletion - self.fake_session["admin"] = True # to force deletion - self.fake_session["project_id"] = [] # to force deletion + self.fake_session["force"] = True # to force deletion + self.fake_session["admin"] = True # to force deletion + self.fake_session["project_id"] = [] # to force deletion oid = self.topic.delete(self.fake_session, cid) self.assertIsNone(oid, "Wrong operation identifier") - self.assertEqual(self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic") - self.assertEqual(self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier") - self.assertEqual(self.db.del_one.call_args[0][0], self.topic.topic, "Wrong topic") - self.assertEqual(self.db.del_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier") - self.topic._send_msg.assert_called_once_with("deleted", {"_id": cid, "op_id": None}, not_send_msg=None) + self.assertEqual( + self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic" + ) + self.assertEqual( + self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier" + ) + self.assertEqual( + self.db.del_one.call_args[0][0], self.topic.topic, "Wrong topic" + ) + self.assertEqual( + self.db.del_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier" + ) + self.topic._send_msg.assert_called_once_with( + "deleted", {"_id": cid, "op_id": None}, not_send_msg=None + ) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main()