Modifications for test of feature 7953
[osm/NBI.git] / osm_nbi / tests / test_admin_topics.py
1 #! /usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16
17 __author__ = "Pedro de la Cruz Ramos, pedro.delacruzramos@altran.com"
18 __date__ = "$2019-10-019"
19
20 import unittest
21 from unittest import TestCase
22 from unittest.mock import Mock
23 from uuid import uuid4
24 from http import HTTPStatus
25 from time import time
26 from random import randint
27 from osm_common import dbbase, fsbase, msgbase
28 from osm_nbi import authconn, validation
29 from osm_nbi.admin_topics import ProjectTopicAuth, RoleTopicAuth, UserTopicAuth, CommonVimWimSdn
30 from osm_nbi.engine import EngineException
31 from osm_nbi.authconn import AuthconnNotFoundException
32
33
34 test_pid = str(uuid4())
35 test_name = "test-user"
36
37
38 def norm(str):
39 """Normalize string for checking"""
40 return ' '.join(str.strip().split()).lower()
41
42
43 class Test_ProjectTopicAuth(TestCase):
44
45 @classmethod
46 def setUpClass(cls):
47 cls.test_name = "test-project-topic"
48
49 def setUp(self):
50 self.db = Mock(dbbase.DbBase())
51 self.fs = Mock(fsbase.FsBase())
52 self.msg = Mock(msgbase.MsgBase())
53 self.auth = Mock(authconn.Authconn(None, None))
54 self.topic = ProjectTopicAuth(self.db, self.fs, self.msg, self.auth)
55 self.fake_session = {"username": self.test_name, "project_id": (test_pid,), "method": None,
56 "admin": True, "force": False, "public": False, "allow_show_user_project_role": True}
57
58 def test_new_project(self):
59 with self.subTest(i=1):
60 rollback = []
61 pid1 = str(uuid4())
62 self.auth.get_project_list.return_value = []
63 self.auth.create_project.return_value = pid1
64 pid2, oid = self.topic.new(rollback, self.fake_session, {"name": self.test_name, "quotas": {}})
65 self.assertEqual(len(rollback), 1, "Wrong rollback length")
66 self.assertEqual(pid2, pid1, "Wrong project identifier")
67 content = self.auth.create_project.call_args[0][0]
68 self.assertEqual(content["name"], self.test_name, "Wrong project name")
69 self.assertEqual(content["quotas"], {}, "Wrong quotas")
70 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
71 self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time")
72 with self.subTest(i=2):
73 rollback = []
74 with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e:
75 self.topic.new(rollback, self.fake_session, {"name": "other-project-name", "quotas": {"baditems": 10}})
76 self.assertEqual(len(rollback), 0, "Wrong rollback length")
77 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
78 self.assertIn("format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'"
79 .format("baditems"), norm(str(e.exception)), "Wrong exception text")
80
81 def test_edit_project(self):
82 now = time()
83 pid = str(uuid4())
84 proj = {"_id": pid, "name": self.test_name, "_admin": {"created": now, "modified": now}}
85 with self.subTest(i=1):
86 self.auth.get_project_list.side_effect = [[proj], []]
87 new_name = "new-project-name"
88 quotas = {"vnfds": randint(0, 100), "nsds": randint(0, 100)}
89 self.topic.edit(self.fake_session, pid, {"name": new_name, "quotas": quotas})
90 _id, content = self.auth.update_project.call_args[0]
91 self.assertEqual(_id, pid, "Wrong project identifier")
92 self.assertEqual(content["_id"], pid, "Wrong project identifier")
93 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
94 self.assertGreater(content["_admin"]["modified"], now, "Wrong modification time")
95 self.assertEqual(content["name"], new_name, "Wrong project name")
96 self.assertEqual(content["quotas"], quotas, "Wrong quotas")
97 with self.subTest(i=2):
98 new_name = "other-project-name"
99 quotas = {"baditems": randint(0, 100)}
100 self.auth.get_project_list.side_effect = [[proj], []]
101 with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e:
102 self.topic.edit(self.fake_session, pid, {"name": new_name, "quotas": quotas})
103 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
104 self.assertIn("format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'"
105 .format("baditems"), norm(str(e.exception)), "Wrong exception text")
106
107 def test_conflict_on_new(self):
108 with self.subTest(i=1):
109 rollback = []
110 pid = str(uuid4())
111 with self.assertRaises(EngineException, msg="Accepted uuid as project name") as e:
112 self.topic.new(rollback, self.fake_session, {"name": pid})
113 self.assertEqual(len(rollback), 0, "Wrong rollback length")
114 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
115 self.assertIn("project name '{}' cannot have an uuid format".format(pid),
116 norm(str(e.exception)), "Wrong exception text")
117 with self.subTest(i=2):
118 rollback = []
119 self.auth.get_project_list.return_value = [{"_id": test_pid, "name": self.test_name}]
120 with self.assertRaises(EngineException, msg="Accepted existing project name") as e:
121 self.topic.new(rollback, self.fake_session, {"name": self.test_name})
122 self.assertEqual(len(rollback), 0, "Wrong rollback length")
123 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
124 self.assertIn("project '{}' exists".format(self.test_name),
125 norm(str(e.exception)), "Wrong exception text")
126
127 def test_conflict_on_edit(self):
128 with self.subTest(i=1):
129 self.auth.get_project_list.return_value = [{"_id": test_pid, "name": self.test_name}]
130 new_name = str(uuid4())
131 with self.assertRaises(EngineException, msg="Accepted uuid as project name") as e:
132 self.topic.edit(self.fake_session, test_pid, {"name": new_name})
133 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
134 self.assertIn("project name '{}' cannot have an uuid format".format(new_name),
135 norm(str(e.exception)), "Wrong exception text")
136 with self.subTest(i=2):
137 pid = str(uuid4())
138 self.auth.get_project_list.return_value = [{"_id": pid, "name": "admin"}]
139 with self.assertRaises(EngineException, msg="Accepted renaming of project 'admin'") as e:
140 self.topic.edit(self.fake_session, pid, {"name": "new-name"})
141 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
142 self.assertIn("you cannot rename project 'admin'",
143 norm(str(e.exception)), "Wrong exception text")
144 with self.subTest(i=3):
145 new_name = "new-project-name"
146 self.auth.get_project_list.side_effect = [[{"_id": test_pid, "name": self.test_name}],
147 [{"_id": str(uuid4()), "name": new_name}]]
148 with self.assertRaises(EngineException, msg="Accepted existing project name") as e:
149 self.topic.edit(self.fake_session, pid, {"name": new_name})
150 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
151 self.assertIn("project '{}' is already used".format(new_name),
152 norm(str(e.exception)), "Wrong exception text")
153
154 def test_delete_project(self):
155 with self.subTest(i=1):
156 pid = str(uuid4())
157 self.auth.get_project.return_value = {"_id": pid, "name": "other-project-name"}
158 self.auth.delete_project.return_value = {"deleted": 1}
159 self.auth.get_user_list.return_value = []
160 self.db.get_list.return_value = []
161 rc = self.topic.delete(self.fake_session, pid)
162 self.assertEqual(rc, {"deleted": 1}, "Wrong project deletion return info")
163 self.assertEqual(self.auth.get_project.call_args[0][0], pid, "Wrong project identifier")
164 self.assertEqual(self.auth.delete_project.call_args[0][0], pid, "Wrong project identifier")
165
166 def test_conflict_on_del(self):
167 with self.subTest(i=1):
168 self.auth.get_project.return_value = {"_id": test_pid, "name": self.test_name}
169 with self.assertRaises(EngineException, msg="Accepted deletion of own project") as e:
170 self.topic.delete(self.fake_session, self.test_name)
171 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
172 self.assertIn("you cannot delete your own project", norm(str(e.exception)), "Wrong exception text")
173 with self.subTest(i=2):
174 self.auth.get_project.return_value = {"_id": str(uuid4()), "name": "admin"}
175 with self.assertRaises(EngineException, msg="Accepted deletion of project 'admin'") as e:
176 self.topic.delete(self.fake_session, "admin")
177 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
178 self.assertIn("you cannot delete project 'admin'", norm(str(e.exception)), "Wrong exception text")
179 with self.subTest(i=3):
180 pid = str(uuid4())
181 name = "other-project-name"
182 self.auth.get_project.return_value = {"_id": pid, "name": name}
183 self.auth.get_user_list.return_value = [{"_id": str(uuid4()), "username": self.test_name,
184 "project_role_mappings": [{"project": pid, "role": str(uuid4())}]}]
185 with self.assertRaises(EngineException, msg="Accepted deletion of used project") as e:
186 self.topic.delete(self.fake_session, pid)
187 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
188 self.assertIn("project '{}' ({}) is being used by user '{}'".format(name, pid, self.test_name),
189 norm(str(e.exception)), "Wrong exception text")
190 with self.subTest(i=4):
191 self.auth.get_user_list.return_value = []
192 self.db.get_list.return_value = [{"_id": str(uuid4()), "id": self.test_name,
193 "_admin": {"projects_read": [pid], "projects_write": []}}]
194 with self.assertRaises(EngineException, msg="Accepted deletion of used project") as e:
195 self.topic.delete(self.fake_session, pid)
196 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
197 self.assertIn("project '{}' ({}) is being used by {} '{}'"
198 .format(name, pid, "vnf descriptor", self.test_name),
199 norm(str(e.exception)), "Wrong exception text")
200
201
202 class Test_RoleTopicAuth(TestCase):
203
204 @classmethod
205 def setUpClass(cls):
206 cls.test_name = "test-role-topic"
207 cls.test_operations = ["tokens:get"]
208
209 def setUp(self):
210 self.db = Mock(dbbase.DbBase())
211 self.fs = Mock(fsbase.FsBase())
212 self.msg = Mock(msgbase.MsgBase())
213 self.auth = Mock(authconn.Authconn(None, None))
214 self.topic = RoleTopicAuth(self.db, self.fs, self.msg, self.auth, self.test_operations)
215 self.fake_session = {"username": test_name, "project_id": (test_pid,), "method": None,
216 "admin": True, "force": False, "public": False, "allow_show_user_project_role": True}
217
218 def test_new_role(self):
219 with self.subTest(i=1):
220 rollback = []
221 rid1 = str(uuid4())
222 perms_in = {"tokens": True}
223 perms_out = {"default": False, "admin": False, "tokens": True}
224 self.auth.get_role_list.return_value = []
225 self.auth.create_role.return_value = rid1
226 rid2, oid = self.topic.new(rollback, self.fake_session, {"name": self.test_name, "permissions": perms_in})
227 self.assertEqual(len(rollback), 1, "Wrong rollback length")
228 self.assertEqual(rid2, rid1, "Wrong project identifier")
229 content = self.auth.create_role.call_args[0][0]
230 self.assertEqual(content["name"], self.test_name, "Wrong role name")
231 self.assertEqual(content["permissions"], perms_out, "Wrong permissions")
232 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
233 self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time")
234 with self.subTest(i=2):
235 rollback = []
236 with self.assertRaises(EngineException, msg="Accepted wrong permissions") as e:
237 self.topic.new(rollback, self.fake_session,
238 {"name": "other-role-name", "permissions": {"projects": True}})
239 self.assertEqual(len(rollback), 0, "Wrong rollback length")
240 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
241 self.assertIn("invalid permission '{}'".format("projects"),
242 norm(str(e.exception)), "Wrong exception text")
243
244 def test_edit_role(self):
245 now = time()
246 rid = str(uuid4())
247 role = {"_id": rid, "name": self.test_name, "permissions": {"tokens": True},
248 "_admin": {"created": now, "modified": now}}
249 with self.subTest(i=1):
250 self.auth.get_role_list.side_effect = [[role], []]
251 self.auth.get_role.return_value = role
252 new_name = "new-role-name"
253 perms_in = {"tokens": False, "tokens:get": True}
254 perms_out = {"default": False, "admin": False, "tokens": False, "tokens:get": True}
255 self.topic.edit(self.fake_session, rid, {"name": new_name, "permissions": perms_in})
256 content = self.auth.update_role.call_args[0][0]
257 self.assertEqual(content["_id"], rid, "Wrong role identifier")
258 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
259 self.assertGreater(content["_admin"]["modified"], now, "Wrong modification time")
260 self.assertEqual(content["name"], new_name, "Wrong role name")
261 self.assertEqual(content["permissions"], perms_out, "Wrong permissions")
262 with self.subTest(i=2):
263 new_name = "other-role-name"
264 perms_in = {"tokens": False, "tokens:post": True}
265 self.auth.get_role_list.side_effect = [[role], []]
266 with self.assertRaises(EngineException, msg="Accepted wrong permissions") as e:
267 self.topic.edit(self.fake_session, rid, {"name": new_name, "permissions": perms_in})
268 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
269 self.assertIn("invalid permission '{}'".format("tokens:post"),
270 norm(str(e.exception)), "Wrong exception text")
271
272 def test_delete_role(self):
273 with self.subTest(i=1):
274 rid = str(uuid4())
275 role = {"_id": rid, "name": "other-role-name"}
276 self.auth.get_role_list.return_value = [role]
277 self.auth.get_role.return_value = role
278 self.auth.delete_role.return_value = {"deleted": 1}
279 self.auth.get_user_list.return_value = []
280 rc = self.topic.delete(self.fake_session, rid)
281 self.assertEqual(rc, {"deleted": 1}, "Wrong role deletion return info")
282 self.assertEqual(self.auth.get_role_list.call_args[0][0]["_id"], rid, "Wrong role identifier")
283 self.assertEqual(self.auth.get_role.call_args[0][0], rid, "Wrong role identifier")
284 self.assertEqual(self.auth.delete_role.call_args[0][0], rid, "Wrong role identifier")
285
286 def test_conflict_on_new(self):
287 with self.subTest(i=1):
288 rollback = []
289 rid = str(uuid4())
290 with self.assertRaises(EngineException, msg="Accepted uuid as role name") as e:
291 self.topic.new(rollback, self.fake_session, {"name": rid})
292 self.assertEqual(len(rollback), 0, "Wrong rollback length")
293 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
294 self.assertIn("role name '{}' cannot have an uuid format".format(rid),
295 norm(str(e.exception)), "Wrong exception text")
296 with self.subTest(i=2):
297 rollback = []
298 self.auth.get_role_list.return_value = [{"_id": str(uuid4()), "name": self.test_name}]
299 with self.assertRaises(EngineException, msg="Accepted existing role name") as e:
300 self.topic.new(rollback, self.fake_session, {"name": self.test_name})
301 self.assertEqual(len(rollback), 0, "Wrong rollback length")
302 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
303 self.assertIn("role name '{}' exists".format(self.test_name),
304 norm(str(e.exception)), "Wrong exception text")
305
306 def test_conflict_on_edit(self):
307 rid = str(uuid4())
308 with self.subTest(i=1):
309 self.auth.get_role_list.return_value = [{"_id": rid, "name": self.test_name, "permissions": {}}]
310 new_name = str(uuid4())
311 with self.assertRaises(EngineException, msg="Accepted uuid as role name") as e:
312 self.topic.edit(self.fake_session, rid, {"name": new_name})
313 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
314 self.assertIn("role name '{}' cannot have an uuid format".format(new_name),
315 norm(str(e.exception)), "Wrong exception text")
316 for i, role_name in enumerate(["system_admin", "project_admin"], start=2):
317 with self.subTest(i=i):
318 rid = str(uuid4())
319 self.auth.get_role.return_value = {"_id": rid, "name": role_name, "permissions": {}}
320 with self.assertRaises(EngineException, msg="Accepted renaming of role '{}'".format(role_name)) as e:
321 self.topic.edit(self.fake_session, rid, {"name": "new-name"})
322 self.assertEqual(e.exception.http_code, HTTPStatus.FORBIDDEN, "Wrong HTTP status code")
323 self.assertIn("you cannot rename role '{}'".format(role_name),
324 norm(str(e.exception)), "Wrong exception text")
325 with self.subTest(i=i+1):
326 new_name = "new-role-name"
327 self.auth.get_role_list.side_effect = [[{"_id": rid, "name": self.test_name, "permissions": {}}],
328 [{"_id": str(uuid4()), "name": new_name, "permissions": {}}]]
329 self.auth.get_role.return_value = {"_id": rid, "name": self.test_name, "permissions": {}}
330 with self.assertRaises(EngineException, msg="Accepted existing role name") as e:
331 self.topic.edit(self.fake_session, rid, {"name": new_name})
332 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
333 self.assertIn("role name '{}' exists".format(new_name),
334 norm(str(e.exception)), "Wrong exception text")
335
336 def test_conflict_on_del(self):
337 for i, role_name in enumerate(["system_admin", "project_admin"], start=1):
338 with self.subTest(i=i):
339 rid = str(uuid4())
340 role = {"_id": rid, "name": role_name}
341 self.auth.get_role_list.return_value = [role]
342 self.auth.get_role.return_value = role
343 with self.assertRaises(EngineException, msg="Accepted deletion of role '{}'".format(role_name)) as e:
344 self.topic.delete(self.fake_session, rid)
345 self.assertEqual(e.exception.http_code, HTTPStatus.FORBIDDEN, "Wrong HTTP status code")
346 self.assertIn("you cannot delete role '{}'".format(role_name),
347 norm(str(e.exception)), "Wrong exception text")
348 with self.subTest(i=i+1):
349 rid = str(uuid4())
350 name = "other-role-name"
351 role = {"_id": rid, "name": name}
352 self.auth.get_role_list.return_value = [role]
353 self.auth.get_role.return_value = role
354 self.auth.get_user_list.return_value = [{"_id": str(uuid4()), "username": self.test_name,
355 "project_role_mappings": [{"project": str(uuid4()), "role": rid}]}]
356 with self.assertRaises(EngineException, msg="Accepted deletion of used role") as e:
357 self.topic.delete(self.fake_session, rid)
358 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
359 self.assertIn("role '{}' ({}) is being used by user '{}'".format(name, rid, self.test_name),
360 norm(str(e.exception)), "Wrong exception text")
361
362
363 class Test_UserTopicAuth(TestCase):
364
365 @classmethod
366 def setUpClass(cls):
367 cls.test_name = "test-user-topic"
368
369 def setUp(self):
370 self.db = Mock(dbbase.DbBase())
371 self.fs = Mock(fsbase.FsBase())
372 self.msg = Mock(msgbase.MsgBase())
373 self.auth = Mock(authconn.Authconn(None, None))
374 self.topic = UserTopicAuth(self.db, self.fs, self.msg, self.auth)
375 self.fake_session = {"username": test_name, "project_id": (test_pid,), "method": None,
376 "admin": True, "force": False, "public": False, "allow_show_user_project_role": True}
377
378 def test_new_user(self):
379 uid1 = str(uuid4())
380 pid = str(uuid4())
381 self.auth.get_user_list.return_value = []
382 self.auth.get_project.return_value = {"_id": pid, "name": "some_project"}
383 self.auth.create_user.return_value = {"_id": uid1, "username": self.test_name}
384 with self.subTest(i=1):
385 rollback = []
386 rid = str(uuid4())
387 self.auth.get_role.return_value = {"_id": rid, "name": "some_role"}
388 prms_in = [{"project": "some_project", "role": "some_role"}]
389 prms_out = [{"project": pid, "role": rid}]
390 uid2, oid = self.topic.new(rollback, self.fake_session, {"username": self.test_name,
391 "password": self.test_name,
392 "project_role_mappings": prms_in
393 })
394 self.assertEqual(len(rollback), 1, "Wrong rollback length")
395 self.assertEqual(uid2, uid1, "Wrong project identifier")
396 content = self.auth.create_user.call_args[0][0]
397 self.assertEqual(content["username"], self.test_name, "Wrong project name")
398 self.assertEqual(content["password"], self.test_name, "Wrong password")
399 self.assertEqual(content["project_role_mappings"], prms_out, "Wrong project-role mappings")
400 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
401 self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time")
402 with self.subTest(i=2):
403 rollback = []
404 def_rid = str(uuid4())
405 def_role = {"_id": def_rid, "name": "project_admin"}
406 self.auth.get_role.return_value = def_role
407 self.auth.get_role_list.return_value = [def_role]
408 prms_out = [{"project": pid, "role": def_rid}]
409 uid2, oid = self.topic.new(rollback, self.fake_session, {"username": self.test_name,
410 "password": self.test_name,
411 "projects": ["some_project"]
412 })
413 self.assertEqual(len(rollback), 1, "Wrong rollback length")
414 self.assertEqual(uid2, uid1, "Wrong project identifier")
415 content = self.auth.create_user.call_args[0][0]
416 self.assertEqual(content["username"], self.test_name, "Wrong project name")
417 self.assertEqual(content["password"], self.test_name, "Wrong password")
418 self.assertEqual(content["project_role_mappings"], prms_out, "Wrong project-role mappings")
419 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
420 self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time")
421 with self.subTest(i=3):
422 rollback = []
423 with self.assertRaises(EngineException, msg="Accepted wrong project-role mappings") as e:
424 self.topic.new(rollback, self.fake_session, {"username": "other-project-name",
425 "password": "other-password",
426 "project_role_mappings": [{}]
427 })
428 self.assertEqual(len(rollback), 0, "Wrong rollback length")
429 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
430 self.assertIn("format error at '{}' '{}'"
431 .format("project_role_mappings:{}", "'{}' is a required property").format(0, "project"),
432 norm(str(e.exception)), "Wrong exception text")
433 with self.subTest(i=4):
434 rollback = []
435 with self.assertRaises(EngineException, msg="Accepted wrong projects") as e:
436 self.topic.new(rollback, self.fake_session, {"username": "other-project-name",
437 "password": "other-password",
438 "projects": []
439 })
440 self.assertEqual(len(rollback), 0, "Wrong rollback length")
441 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
442 self.assertIn("format error at '{}' '{}'" .format("projects", "{} is too short").format([]),
443 norm(str(e.exception)), "Wrong exception text")
444
445 def test_edit_user(self):
446 now = time()
447 uid = str(uuid4())
448 pid1 = str(uuid4())
449 rid1 = str(uuid4())
450 prms = [{"project": pid1, "project_name": "project-1", "role": rid1, "role_name": "role-1"}]
451 user = {"_id": uid, "username": self.test_name, "project_role_mappings": prms,
452 "_admin": {"created": now, "modified": now}}
453 with self.subTest(i=1):
454 self.auth.get_user_list.side_effect = [[user], []]
455 self.auth.get_user.return_value = user
456 pid2 = str(uuid4())
457 rid2 = str(uuid4())
458 self.auth.get_project.side_effect = [{"_id": pid2, "name": "project-2"},
459 {"_id": pid1, "name": "project-1"}]
460 self.auth.get_role.side_effect = [{"_id": rid2, "name": "role-2"},
461 {"_id": rid1, "name": "role-1"}]
462 new_name = "new-user-name"
463 new_pasw = "new-password"
464 add_prms = [{"project": pid2, "role": rid2}]
465 rem_prms = [{"project": pid1, "role": rid1}]
466 self.topic.edit(self.fake_session, uid, {"username": new_name, "password": new_pasw,
467 "add_project_role_mappings": add_prms,
468 "remove_project_role_mappings": rem_prms
469 })
470 content = self.auth.update_user.call_args[0][0]
471 self.assertEqual(content["_id"], uid, "Wrong user identifier")
472 self.assertEqual(content["username"], new_name, "Wrong user name")
473 self.assertEqual(content["password"], new_pasw, "Wrong user password")
474 self.assertEqual(content["add_project_role_mappings"], add_prms, "Wrong project-role mappings to add")
475 self.assertEqual(content["remove_project_role_mappings"], prms, "Wrong project-role mappings to remove")
476 with self.subTest(i=2):
477 new_name = "other-user-name"
478 new_prms = [{}]
479 self.auth.get_role_list.side_effect = [[user], []]
480 with self.assertRaises(EngineException, msg="Accepted wrong project-role mappings") as e:
481 self.topic.edit(self.fake_session, uid, {"username": new_name, "project_role_mappings": new_prms})
482 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
483 self.assertIn("format error at '{}' '{}'"
484 .format("project_role_mappings:{}", "'{}' is a required property").format(0, "project"),
485 norm(str(e.exception)), "Wrong exception text")
486
487 def test_delete_user(self):
488 with self.subTest(i=1):
489 uid = str(uuid4())
490 self.fake_session["username"] = self.test_name
491 user = user = {"_id": uid, "username": "other-user-name", "project_role_mappings": []}
492 self.auth.get_user.return_value = user
493 self.auth.delete_user.return_value = {"deleted": 1}
494 rc = self.topic.delete(self.fake_session, uid)
495 self.assertEqual(rc, {"deleted": 1}, "Wrong user deletion return info")
496 self.assertEqual(self.auth.get_user.call_args[0][0], uid, "Wrong user identifier")
497 self.assertEqual(self.auth.delete_user.call_args[0][0], uid, "Wrong user identifier")
498
499 def test_conflict_on_new(self):
500 with self.subTest(i=1):
501 rollback = []
502 uid = str(uuid4())
503 with self.assertRaises(EngineException, msg="Accepted uuid as username") as e:
504 self.topic.new(rollback, self.fake_session, {"username": uid, "password": self.test_name,
505 "projects": [test_pid]})
506 self.assertEqual(len(rollback), 0, "Wrong rollback length")
507 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
508 self.assertIn("username '{}' cannot have a uuid format".format(uid),
509 norm(str(e.exception)), "Wrong exception text")
510 with self.subTest(i=2):
511 rollback = []
512 self.auth.get_user_list.return_value = [{"_id": str(uuid4()), "username": self.test_name}]
513 with self.assertRaises(EngineException, msg="Accepted existing username") as e:
514 self.topic.new(rollback, self.fake_session, {"username": self.test_name, "password": self.test_name,
515 "projects": [test_pid]})
516 self.assertEqual(len(rollback), 0, "Wrong rollback length")
517 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
518 self.assertIn("username '{}' is already used".format(self.test_name),
519 norm(str(e.exception)), "Wrong exception text")
520 with self.subTest(i=3):
521 rollback = []
522 self.auth.get_user_list.return_value = []
523 self.auth.get_role_list.side_effect = [[], []]
524 with self.assertRaises(AuthconnNotFoundException, msg="Accepted user without default role") as e:
525 self.topic.new(rollback, self.fake_session, {"username": self.test_name, "password": self.test_name,
526 "projects": [str(uuid4())]})
527 self.assertEqual(len(rollback), 0, "Wrong rollback length")
528 self.assertEqual(e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code")
529 self.assertIn("can't find default role for user '{}'".format(self.test_name),
530 norm(str(e.exception)), "Wrong exception text")
531
532 def test_conflict_on_edit(self):
533 uid = str(uuid4())
534 with self.subTest(i=1):
535 self.auth.get_user_list.return_value = [{"_id": uid, "username": self.test_name}]
536 new_name = str(uuid4())
537 with self.assertRaises(EngineException, msg="Accepted uuid as username") as e:
538 self.topic.edit(self.fake_session, uid, {"username": new_name})
539 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
540 self.assertIn("username '{}' cannot have an uuid format".format(new_name),
541 norm(str(e.exception)), "Wrong exception text")
542 with self.subTest(i=2):
543 self.auth.get_user_list.return_value = [{"_id": uid, "username": self.test_name}]
544 self.auth.get_role_list.side_effect = [[], []]
545 with self.assertRaises(AuthconnNotFoundException, msg="Accepted user without default role") as e:
546 self.topic.edit(self.fake_session, uid, {"projects": [str(uuid4())]})
547 self.assertEqual(e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code")
548 self.assertIn("can't find a default role for user '{}'".format(self.test_name),
549 norm(str(e.exception)), "Wrong exception text")
550 with self.subTest(i=3):
551 admin_uid = str(uuid4())
552 self.auth.get_user_list.return_value = [{"_id": admin_uid, "username": "admin"}]
553 with self.assertRaises(EngineException, msg="Accepted removing system_admin role from admin user") as e:
554 self.topic.edit(self.fake_session, admin_uid,
555 {"remove_project_role_mappings": [{"project": "admin", "role": "system_admin"}]})
556 self.assertEqual(e.exception.http_code, HTTPStatus.FORBIDDEN, "Wrong HTTP status code")
557 self.assertIn("you cannot remove system_admin role from admin user",
558 norm(str(e.exception)), "Wrong exception text")
559 with self.subTest(i=4):
560 new_name = "new-user-name"
561 self.auth.get_user_list.side_effect = [[{"_id": uid, "name": self.test_name}],
562 [{"_id": str(uuid4()), "name": new_name}]]
563 with self.assertRaises(EngineException, msg="Accepted existing username") as e:
564 self.topic.edit(self.fake_session, uid, {"username": new_name})
565 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
566 self.assertIn("username '{}' is already used".format(new_name),
567 norm(str(e.exception)), "Wrong exception text")
568
569 def test_conflict_on_del(self):
570 with self.subTest(i=1):
571 uid = str(uuid4())
572 self.fake_session["username"] = self.test_name
573 user = user = {"_id": uid, "username": self.test_name, "project_role_mappings": []}
574 self.auth.get_user.return_value = user
575 with self.assertRaises(EngineException, msg="Accepted deletion of own user") as e:
576 self.topic.delete(self.fake_session, uid)
577 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
578 self.assertIn("you cannot delete your own login user", norm(str(e.exception)), "Wrong exception text")
579
580
581 class Test_CommonVimWimSdn(TestCase):
582
583 @classmethod
584 def setUpClass(cls):
585 cls.test_name = "test-cim-topic" # CIM = Common Infrastructure Manager
586
587 def setUp(self):
588 self.db = Mock(dbbase.DbBase())
589 self.fs = Mock(fsbase.FsBase())
590 self.msg = Mock(msgbase.MsgBase())
591 self.auth = Mock(authconn.Authconn(None, None))
592 self.topic = CommonVimWimSdn(self.db, self.fs, self.msg, self.auth)
593 # Use WIM schemas for testing because they are the simplest
594 self.topic.topic = "wims"
595 self.topic.schema_new = validation.wim_account_new_schema
596 self.topic.schema_edit = validation.wim_account_edit_schema
597 self.fake_session = {"username": test_name, "project_id": (test_pid,), "method": None,
598 "admin": True, "force": False, "public": False, "allow_show_user_project_role": True}
599
600 def test_new_cvws(self):
601 test_url = "http://0.0.0.0:0"
602 with self.subTest(i=1):
603 rollback = []
604 test_type = "fake"
605 self.db.get_one.return_value = None
606 self.db.create.side_effect = lambda self, content: content["_id"]
607 cid, oid = self.topic.new(rollback, self.fake_session,
608 {"name": self.test_name, "wim_url": test_url, "wim_type": test_type})
609 self.assertEqual(len(rollback), 1, "Wrong rollback length")
610 args = self.db.create.call_args[0]
611 content = args[1]
612 self.assertEqual(args[0], self.topic.topic, "Wrong topic")
613 self.assertEqual(content["_id"], cid, "Wrong CIM identifier")
614 self.assertEqual(content["name"], self.test_name, "Wrong CIM name")
615 self.assertEqual(content["wim_url"], test_url, "Wrong URL")
616 self.assertEqual(content["wim_type"], test_type, "Wrong CIM type")
617 self.assertEqual(content["schema_version"], "1.11", "Wrong schema version")
618 self.assertEqual(content["op_id"], oid, "Wrong operation identifier")
619 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
620 self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time")
621 self.assertEqual(content["_admin"]["operationalState"], "PROCESSING", "Wrong operational state")
622 self.assertEqual(content["_admin"]["projects_read"], [test_pid], "Wrong read-only projects")
623 self.assertEqual(content["_admin"]["projects_write"], [test_pid], "Wrong read/write projects")
624 self.assertIsNone(content["_admin"]["current_operation"], "Wrong current operation")
625 self.assertEqual(len(content["_admin"]["operations"]), 1, "Wrong number of operations")
626 operation = content["_admin"]["operations"][0]
627 self.assertEqual(operation["lcmOperationType"], "create", "Wrong operation type")
628 self.assertEqual(operation["operationState"], "PROCESSING", "Wrong operation state")
629 self.assertGreater(operation["startTime"], content["_admin"]["created"], "Wrong operation start time")
630 self.assertGreater(operation["statusEnteredTime"], content["_admin"]["created"],
631 "Wrong operation status enter time")
632 self.assertEqual(operation["detailed-status"], "", "Wrong operation detailed status info")
633 self.assertIsNone(operation["operationParams"], "Wrong operation parameters")
634 # This test is disabled. From Feature 8030 we admit all WIM/SDN types
635 # with self.subTest(i=2):
636 # rollback = []
637 # test_type = "bad_type"
638 # with self.assertRaises(EngineException, msg="Accepted wrong CIM type") as e:
639 # self.topic.new(rollback, self.fake_session,
640 # {"name": self.test_name, "wim_url": test_url, "wim_type": test_type})
641 # self.assertEqual(len(rollback), 0, "Wrong rollback length")
642 # self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
643 # self.assertIn("format error at '{}' '{}".format("wim_type", "'{}' is not one of {}").format(test_type,""),
644 # norm(str(e.exception)), "Wrong exception text")
645
646 def test_conflict_on_new(self):
647 with self.subTest(i=1):
648 rollback = []
649 test_url = "http://0.0.0.0:0"
650 test_type = "fake"
651 self.db.get_one.return_value = {"_id": str(uuid4()), "name": self.test_name}
652 with self.assertRaises(EngineException, msg="Accepted existing CIM name") as e:
653 self.topic.new(rollback, self.fake_session,
654 {"name": self.test_name, "wim_url": test_url, "wim_type": test_type})
655 self.assertEqual(len(rollback), 0, "Wrong rollback length")
656 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
657 self.assertIn("name '{}' already exists for {}".format(self.test_name, self.topic.topic),
658 norm(str(e.exception)), "Wrong exception text")
659
660 def test_edit_cvws(self):
661 now = time()
662 cid = str(uuid4())
663 test_url = "http://0.0.0.0:0"
664 test_type = "fake"
665 cvws = {"_id": cid, "name": self.test_name, "wim_url": test_url, "wim_type": test_type,
666 "_admin": {"created": now, "modified": now, "operations": [{"lcmOperationType": "create"}]}}
667 with self.subTest(i=1):
668 new_name = "new-cim-name"
669 new_url = "https://1.1.1.1:1"
670 new_type = "onos"
671 self.db.get_one.side_effect = [cvws, None]
672 self.db.replace.return_value = {"updated": 1}
673 # self.db.encrypt.side_effect = [b64str(), b64str()]
674 self.topic.edit(self.fake_session, cid, {"name": new_name, "wim_url": new_url, "wim_type": new_type})
675 args = self.db.replace.call_args[0]
676 content = args[2]
677 self.assertEqual(args[0], self.topic.topic, "Wrong topic")
678 self.assertEqual(args[1], cid, "Wrong CIM identifier")
679 self.assertEqual(content["_id"], cid, "Wrong CIM identifier")
680 self.assertEqual(content["name"], new_name, "Wrong CIM name")
681 self.assertEqual(content["wim_type"], new_type, "Wrong CIM type")
682 self.assertEqual(content["wim_url"], new_url, "Wrong URL")
683 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
684 self.assertGreater(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time")
685 self.assertEqual(len(content["_admin"]["operations"]), 2, "Wrong number of operations")
686 operation = content["_admin"]["operations"][1]
687 self.assertEqual(operation["lcmOperationType"], "edit", "Wrong operation type")
688 self.assertEqual(operation["operationState"], "PROCESSING", "Wrong operation state")
689 self.assertGreater(operation["startTime"], content["_admin"]["modified"], "Wrong operation start time")
690 self.assertGreater(operation["statusEnteredTime"], content["_admin"]["modified"],
691 "Wrong operation status enter time")
692 self.assertEqual(operation["detailed-status"], "", "Wrong operation detailed status info")
693 self.assertIsNone(operation["operationParams"], "Wrong operation parameters")
694 with self.subTest(i=2):
695 with self.assertRaises(EngineException, msg="Accepted wrong property") as e:
696 self.topic.edit(self.fake_session, str(uuid4()), {"name": "new-name", "extra_prop": "anything"})
697 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
698 self.assertIn("format error '{}'".format("additional properties are not allowed ('{}' was unexpected)").
699 format("extra_prop"),
700 norm(str(e.exception)), "Wrong exception text")
701
702 def test_conflict_on_edit(self):
703 with self.subTest(i=1):
704 cid = str(uuid4())
705 new_name = "new-cim-name"
706 self.db.get_one.side_effect = [{"_id": cid, "name": self.test_name},
707 {"_id": str(uuid4()), "name": new_name}]
708 with self.assertRaises(EngineException, msg="Accepted existing CIM name") as e:
709 self.topic.edit(self.fake_session, cid, {"name": new_name})
710 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
711 self.assertIn("name '{}' already exists for {}".format(new_name, self.topic.topic),
712 norm(str(e.exception)), "Wrong exception text")
713
714 def test_delete_cvws(self):
715 cid = str(uuid4())
716 ro_pid = str(uuid4())
717 rw_pid = str(uuid4())
718 cvws = {"_id": cid, "name": self.test_name}
719 self.db.get_list.return_value = []
720 with self.subTest(i=1):
721 cvws["_admin"] = {"projects_read": [test_pid, ro_pid, rw_pid], "projects_write": [test_pid, rw_pid]}
722 self.db.get_one.return_value = cvws
723 oid = self.topic.delete(self.fake_session, cid)
724 self.assertIsNone(oid, "Wrong operation identifier")
725 self.assertEqual(self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic")
726 self.assertEqual(self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier")
727 self.assertEqual(self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic")
728 self.assertEqual(self.db.set_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier")
729 self.assertEqual(self.db.set_one.call_args[1]["update_dict"]["_admin.projects_read"], [ro_pid, rw_pid],
730 "Wrong read-only projects update")
731 self.assertEqual(self.db.set_one.call_args[1]["update_dict"]["_admin.projects_write"], [rw_pid],
732 "Wrong read/write projects update")
733 with self.subTest(i=2):
734 now = time()
735 cvws["_admin"] = {"projects_read": [test_pid], "projects_write": [test_pid], "operations": []}
736 self.db.get_one.return_value = cvws
737 oid = self.topic.delete(self.fake_session, cid)
738 self.assertEqual(oid, cid+":0", "Wrong operation identifier")
739 self.assertEqual(self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic")
740 self.assertEqual(self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier")
741 self.assertEqual(self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic")
742 self.assertEqual(self.db.set_one.call_args[0][1]["_id"], cid, "Wrong user identifier")
743 update_dict = self.db.set_one.call_args[1]["update_dict"]
744 self.assertEqual(update_dict["_admin.projects_read"], [], "Wrong read-only projects update")
745 self.assertEqual(update_dict["_admin.projects_write"], [], "Wrong read/write projects update")
746 self.assertEqual(update_dict["_admin.to_delete"], True, "Wrong deletion mark")
747 operation = self.db.set_one.call_args[1]["push"]["_admin.operations"]
748 self.assertEqual(operation["lcmOperationType"], "delete", "Wrong operation type")
749 self.assertEqual(operation["operationState"], "PROCESSING", "Wrong operation state")
750 self.assertEqual(operation["detailed-status"], "", "Wrong operation detailed status")
751 self.assertIsNone(operation["operationParams"], "Wrong operation parameters")
752 self.assertGreater(operation["startTime"], now, "Wrong operation start time")
753 self.assertGreater(operation["statusEnteredTime"], now, "Wrong operation status enter time")
754 with self.subTest(i=3):
755 cvws["_admin"] = {"projects_read": [], "projects_write": [], "operations": []}
756 self.db.get_one.return_value = cvws
757 self.fake_session["force"] = True # to force deletion
758 oid = self.topic.delete(self.fake_session, cid)
759 self.assertIsNone(oid, "Wrong operation identifier")
760 self.assertEqual(self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic")
761 self.assertEqual(self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier")
762 self.assertEqual(self.db.del_one.call_args[0][0], self.topic.topic, "Wrong topic")
763 self.assertEqual(self.db.del_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier")
764
765
766 if __name__ == '__main__':
767 unittest.main()