739b68c1360a92f1fb35016ecae897e5cfc3cdef
[osm/NBI.git] / test_admin_topics.py
1 #! /usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16
17 __author__ = "Pedro de la Cruz Ramos, pedro.delacruzramos@altran.com"
18 __date__ = "$2019-10-019"
19
20 import unittest
21 from unittest import TestCase
22 from unittest.mock import Mock
23 from uuid import uuid4
24 from http import HTTPStatus
25 from time import time
26 from random import randint
27 from osm_common import dbbase, fsbase, msgbase
28 from osm_nbi import authconn, validation
29 from osm_nbi.admin_topics import ProjectTopicAuth, RoleTopicAuth, UserTopicAuth, CommonVimWimSdn
30 from osm_nbi.engine import EngineException
31 from osm_nbi.authconn import AuthconnNotFoundException
32
33
34 test_pid = str(uuid4())
35 test_name = "test-user"
36
37
38 def norm(str):
39 """Normalize string for checking"""
40 return ' '.join(str.strip().split()).lower()
41
42
43 class Test_ProjectTopicAuth(TestCase):
44
45 @classmethod
46 def setUpClass(cls):
47 cls.test_name = "test-project-topic"
48
49 def setUp(self):
50 self.db = Mock(dbbase.DbBase())
51 self.fs = Mock(fsbase.FsBase())
52 self.msg = Mock(msgbase.MsgBase())
53 self.auth = Mock(authconn.Authconn(None, None, None))
54 self.topic = ProjectTopicAuth(self.db, self.fs, self.msg, self.auth)
55 self.fake_session = {"username": self.test_name, "project_id": (test_pid,), "method": None,
56 "admin": True, "force": False, "public": False, "allow_show_user_project_role": True}
57
58 def test_new_project(self):
59 with self.subTest(i=1):
60 rollback = []
61 pid1 = str(uuid4())
62 self.auth.get_project_list.return_value = []
63 self.auth.create_project.return_value = pid1
64 pid2, oid = self.topic.new(rollback, self.fake_session, {"name": self.test_name, "quotas": {}})
65 self.assertEqual(len(rollback), 1, "Wrong rollback length")
66 self.assertEqual(pid2, pid1, "Wrong project identifier")
67 content = self.auth.create_project.call_args[0][0]
68 self.assertEqual(content["name"], self.test_name, "Wrong project name")
69 self.assertEqual(content["quotas"], {}, "Wrong quotas")
70 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
71 self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time")
72 with self.subTest(i=2):
73 rollback = []
74 with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e:
75 self.topic.new(rollback, self.fake_session, {"name": "other-project-name", "quotas": {"baditems": 10}})
76 self.assertEqual(len(rollback), 0, "Wrong rollback length")
77 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
78 self.assertIn("format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'"
79 .format("baditems"), norm(str(e.exception)), "Wrong exception text")
80
81 def test_edit_project(self):
82 now = time()
83 pid = str(uuid4())
84 proj = {"_id": pid, "name": self.test_name, "_admin": {"created": now, "modified": now}}
85 with self.subTest(i=1):
86 self.auth.get_project_list.side_effect = [[proj], []]
87 new_name = "new-project-name"
88 quotas = {"vnfds": randint(0, 100), "nsds": randint(0, 100)}
89 self.topic.edit(self.fake_session, pid, {"name": new_name, "quotas": quotas})
90 _id, content = self.auth.update_project.call_args[0]
91 self.assertEqual(_id, pid, "Wrong project identifier")
92 self.assertEqual(content["_id"], pid, "Wrong project identifier")
93 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
94 self.assertGreater(content["_admin"]["modified"], now, "Wrong modification time")
95 self.assertEqual(content["name"], new_name, "Wrong project name")
96 self.assertEqual(content["quotas"], quotas, "Wrong quotas")
97 with self.subTest(i=2):
98 new_name = "other-project-name"
99 quotas = {"baditems": randint(0, 100)}
100 self.auth.get_project_list.side_effect = [[proj], []]
101 with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e:
102 self.topic.edit(self.fake_session, pid, {"name": new_name, "quotas": quotas})
103 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
104 self.assertIn("format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'"
105 .format("baditems"), norm(str(e.exception)), "Wrong exception text")
106
107 def test_conflict_on_new(self):
108 with self.subTest(i=1):
109 rollback = []
110 pid = str(uuid4())
111 with self.assertRaises(EngineException, msg="Accepted uuid as project name") as e:
112 self.topic.new(rollback, self.fake_session, {"name": pid})
113 self.assertEqual(len(rollback), 0, "Wrong rollback length")
114 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
115 self.assertIn("project name '{}' cannot have an uuid format".format(pid),
116 norm(str(e.exception)), "Wrong exception text")
117 with self.subTest(i=2):
118 rollback = []
119 self.auth.get_project_list.return_value = [{"_id": test_pid, "name": self.test_name}]
120 with self.assertRaises(EngineException, msg="Accepted existing project name") as e:
121 self.topic.new(rollback, self.fake_session, {"name": self.test_name})
122 self.assertEqual(len(rollback), 0, "Wrong rollback length")
123 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
124 self.assertIn("project '{}' exists".format(self.test_name),
125 norm(str(e.exception)), "Wrong exception text")
126
127 def test_conflict_on_edit(self):
128 with self.subTest(i=1):
129 self.auth.get_project_list.return_value = [{"_id": test_pid, "name": self.test_name}]
130 new_name = str(uuid4())
131 with self.assertRaises(EngineException, msg="Accepted uuid as project name") as e:
132 self.topic.edit(self.fake_session, test_pid, {"name": new_name})
133 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
134 self.assertIn("project name '{}' cannot have an uuid format".format(new_name),
135 norm(str(e.exception)), "Wrong exception text")
136 with self.subTest(i=2):
137 pid = str(uuid4())
138 self.auth.get_project_list.return_value = [{"_id": pid, "name": "admin"}]
139 with self.assertRaises(EngineException, msg="Accepted renaming of project 'admin'") as e:
140 self.topic.edit(self.fake_session, pid, {"name": "new-name"})
141 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
142 self.assertIn("you cannot rename project 'admin'",
143 norm(str(e.exception)), "Wrong exception text")
144 with self.subTest(i=3):
145 new_name = "new-project-name"
146 self.auth.get_project_list.side_effect = [[{"_id": test_pid, "name": self.test_name}],
147 [{"_id": str(uuid4()), "name": new_name}]]
148 with self.assertRaises(EngineException, msg="Accepted existing project name") as e:
149 self.topic.edit(self.fake_session, pid, {"name": new_name})
150 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
151 self.assertIn("project '{}' is already used".format(new_name),
152 norm(str(e.exception)), "Wrong exception text")
153
154 def test_delete_project(self):
155 with self.subTest(i=1):
156 pid = str(uuid4())
157 self.auth.get_project.return_value = {"_id": pid, "name": "other-project-name"}
158 self.auth.delete_project.return_value = {"deleted": 1}
159 self.auth.get_user_list.return_value = []
160 self.db.get_list.return_value = []
161 rc = self.topic.delete(self.fake_session, pid)
162 self.assertEqual(rc, {"deleted": 1}, "Wrong project deletion return info")
163 self.assertEqual(self.auth.get_project.call_args[0][0], pid, "Wrong project identifier")
164 self.assertEqual(self.auth.delete_project.call_args[0][0], pid, "Wrong project identifier")
165
166 def test_conflict_on_del(self):
167 with self.subTest(i=1):
168 self.auth.get_project.return_value = {"_id": test_pid, "name": self.test_name}
169 with self.assertRaises(EngineException, msg="Accepted deletion of own project") as e:
170 self.topic.delete(self.fake_session, self.test_name)
171 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
172 self.assertIn("you cannot delete your own project", norm(str(e.exception)), "Wrong exception text")
173 with self.subTest(i=2):
174 self.auth.get_project.return_value = {"_id": str(uuid4()), "name": "admin"}
175 with self.assertRaises(EngineException, msg="Accepted deletion of project 'admin'") as e:
176 self.topic.delete(self.fake_session, "admin")
177 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
178 self.assertIn("you cannot delete project 'admin'", norm(str(e.exception)), "Wrong exception text")
179 with self.subTest(i=3):
180 pid = str(uuid4())
181 name = "other-project-name"
182 self.auth.get_project.return_value = {"_id": pid, "name": name}
183 self.auth.get_user_list.return_value = [{"_id": str(uuid4()), "username": self.test_name,
184 "project_role_mappings": [{"project": pid, "role": str(uuid4())}]}]
185 with self.assertRaises(EngineException, msg="Accepted deletion of used project") as e:
186 self.topic.delete(self.fake_session, pid)
187 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
188 self.assertIn("project '{}' ({}) is being used by user '{}'".format(name, pid, self.test_name),
189 norm(str(e.exception)), "Wrong exception text")
190 with self.subTest(i=4):
191 self.auth.get_user_list.return_value = []
192 self.db.get_list.return_value = [{"_id": str(uuid4()), "id": self.test_name,
193 "_admin": {"projects_read": [pid], "projects_write": []}}]
194 with self.assertRaises(EngineException, msg="Accepted deletion of used project") as e:
195 self.topic.delete(self.fake_session, pid)
196 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
197 self.assertIn("project '{}' ({}) is being used by {} '{}'"
198 .format(name, pid, "vnf descriptor", self.test_name),
199 norm(str(e.exception)), "Wrong exception text")
200
201
202 class Test_RoleTopicAuth(TestCase):
203
204 @classmethod
205 def setUpClass(cls):
206 cls.test_name = "test-role-topic"
207 cls.test_operations = ["tokens:get"]
208
209 def setUp(self):
210 self.db = Mock(dbbase.DbBase())
211 self.fs = Mock(fsbase.FsBase())
212 self.msg = Mock(msgbase.MsgBase())
213 self.auth = Mock(authconn.Authconn(None, None, None))
214 self.topic = RoleTopicAuth(self.db, self.fs, self.msg, self.auth, self.test_operations)
215 self.fake_session = {"username": test_name, "project_id": (test_pid,), "method": None,
216 "admin": True, "force": False, "public": False, "allow_show_user_project_role": True}
217
218 def test_new_role(self):
219 with self.subTest(i=1):
220 rollback = []
221 rid1 = str(uuid4())
222 perms_in = {"tokens": True}
223 perms_out = {"default": False, "admin": False, "tokens": True}
224 self.auth.get_role_list.return_value = []
225 self.auth.create_role.return_value = rid1
226 rid2, oid = self.topic.new(rollback, self.fake_session, {"name": self.test_name, "permissions": perms_in})
227 self.assertEqual(len(rollback), 1, "Wrong rollback length")
228 self.assertEqual(rid2, rid1, "Wrong project identifier")
229 content = self.auth.create_role.call_args[0][0]
230 self.assertEqual(content["name"], self.test_name, "Wrong role name")
231 self.assertEqual(content["permissions"], perms_out, "Wrong permissions")
232 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
233 self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time")
234 with self.subTest(i=2):
235 rollback = []
236 with self.assertRaises(EngineException, msg="Accepted wrong permissions") as e:
237 self.topic.new(rollback, self.fake_session,
238 {"name": "other-role-name", "permissions": {"projects": True}})
239 self.assertEqual(len(rollback), 0, "Wrong rollback length")
240 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
241 self.assertIn("invalid permission '{}'".format("projects"),
242 norm(str(e.exception)), "Wrong exception text")
243
244 def test_edit_role(self):
245 now = time()
246 rid = str(uuid4())
247 role = {"_id": rid, "name": self.test_name, "permissions": {"tokens": True},
248 "_admin": {"created": now, "modified": now}}
249 with self.subTest(i=1):
250 self.auth.get_role_list.side_effect = [[role], []]
251 self.auth.get_role.return_value = role
252 new_name = "new-role-name"
253 perms_in = {"tokens": False, "tokens:get": True}
254 perms_out = {"default": False, "admin": False, "tokens": False, "tokens:get": True}
255 self.topic.edit(self.fake_session, rid, {"name": new_name, "permissions": perms_in})
256 content = self.auth.update_role.call_args[0][0]
257 self.assertEqual(content["_id"], rid, "Wrong role identifier")
258 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
259 self.assertGreater(content["_admin"]["modified"], now, "Wrong modification time")
260 self.assertEqual(content["name"], new_name, "Wrong role name")
261 self.assertEqual(content["permissions"], perms_out, "Wrong permissions")
262 with self.subTest(i=2):
263 new_name = "other-role-name"
264 perms_in = {"tokens": False, "tokens:post": True}
265 self.auth.get_role_list.side_effect = [[role], []]
266 with self.assertRaises(EngineException, msg="Accepted wrong permissions") as e:
267 self.topic.edit(self.fake_session, rid, {"name": new_name, "permissions": perms_in})
268 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
269 self.assertIn("invalid permission '{}'".format("tokens:post"),
270 norm(str(e.exception)), "Wrong exception text")
271
272 def test_delete_role(self):
273 with self.subTest(i=1):
274 rid = str(uuid4())
275 role = {"_id": rid, "name": "other-role-name"}
276 self.auth.get_role_list.return_value = [role]
277 self.auth.get_role.return_value = role
278 self.auth.delete_role.return_value = {"deleted": 1}
279 self.auth.get_user_list.return_value = []
280 rc = self.topic.delete(self.fake_session, rid)
281 self.assertEqual(rc, {"deleted": 1}, "Wrong role deletion return info")
282 self.assertEqual(self.auth.get_role_list.call_args[0][0]["_id"], rid, "Wrong role identifier")
283 self.assertEqual(self.auth.get_role.call_args[0][0], rid, "Wrong role identifier")
284 self.assertEqual(self.auth.delete_role.call_args[0][0], rid, "Wrong role identifier")
285
286 def test_conflict_on_new(self):
287 with self.subTest(i=1):
288 rollback = []
289 rid = str(uuid4())
290 with self.assertRaises(EngineException, msg="Accepted uuid as role name") as e:
291 self.topic.new(rollback, self.fake_session, {"name": rid})
292 self.assertEqual(len(rollback), 0, "Wrong rollback length")
293 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
294 self.assertIn("role name '{}' cannot have an uuid format".format(rid),
295 norm(str(e.exception)), "Wrong exception text")
296 with self.subTest(i=2):
297 rollback = []
298 self.auth.get_role_list.return_value = [{"_id": str(uuid4()), "name": self.test_name}]
299 with self.assertRaises(EngineException, msg="Accepted existing role name") as e:
300 self.topic.new(rollback, self.fake_session, {"name": self.test_name})
301 self.assertEqual(len(rollback), 0, "Wrong rollback length")
302 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
303 self.assertIn("role name '{}' exists".format(self.test_name),
304 norm(str(e.exception)), "Wrong exception text")
305
306 def test_conflict_on_edit(self):
307 rid = str(uuid4())
308 with self.subTest(i=1):
309 self.auth.get_role_list.return_value = [{"_id": rid, "name": self.test_name, "permissions": {}}]
310 new_name = str(uuid4())
311 with self.assertRaises(EngineException, msg="Accepted uuid as role name") as e:
312 self.topic.edit(self.fake_session, rid, {"name": new_name})
313 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
314 self.assertIn("role name '{}' cannot have an uuid format".format(new_name),
315 norm(str(e.exception)), "Wrong exception text")
316 for i, role_name in enumerate(["system_admin", "project_admin"], start=2):
317 with self.subTest(i=i):
318 rid = str(uuid4())
319 self.auth.get_role.return_value = {"_id": rid, "name": role_name, "permissions": {}}
320 with self.assertRaises(EngineException, msg="Accepted renaming of role '{}'".format(role_name)) as e:
321 self.topic.edit(self.fake_session, rid, {"name": "new-name"})
322 self.assertEqual(e.exception.http_code, HTTPStatus.FORBIDDEN, "Wrong HTTP status code")
323 self.assertIn("you cannot rename role '{}'".format(role_name),
324 norm(str(e.exception)), "Wrong exception text")
325 with self.subTest(i=i+1):
326 new_name = "new-role-name"
327 self.auth.get_role_list.side_effect = [[{"_id": rid, "name": self.test_name, "permissions": {}}],
328 [{"_id": str(uuid4()), "name": new_name, "permissions": {}}]]
329 self.auth.get_role.return_value = {"_id": rid, "name": self.test_name, "permissions": {}}
330 with self.assertRaises(EngineException, msg="Accepted existing role name") as e:
331 self.topic.edit(self.fake_session, rid, {"name": new_name})
332 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
333 self.assertIn("role name '{}' exists".format(new_name),
334 norm(str(e.exception)), "Wrong exception text")
335
336 def test_conflict_on_del(self):
337 for i, role_name in enumerate(["system_admin", "project_admin"], start=1):
338 with self.subTest(i=i):
339 rid = str(uuid4())
340 role = {"_id": rid, "name": role_name}
341 self.auth.get_role_list.return_value = [role]
342 self.auth.get_role.return_value = role
343 with self.assertRaises(EngineException, msg="Accepted deletion of role '{}'".format(role_name)) as e:
344 self.topic.delete(self.fake_session, rid)
345 self.assertEqual(e.exception.http_code, HTTPStatus.FORBIDDEN, "Wrong HTTP status code")
346 self.assertIn("you cannot delete role '{}'".format(role_name),
347 norm(str(e.exception)), "Wrong exception text")
348 with self.subTest(i=i+1):
349 rid = str(uuid4())
350 name = "other-role-name"
351 role = {"_id": rid, "name": name}
352 self.auth.get_role_list.return_value = [role]
353 self.auth.get_role.return_value = role
354 self.auth.get_user_list.return_value = [{"_id": str(uuid4()), "username": self.test_name,
355 "project_role_mappings": [{"project": str(uuid4()), "role": rid}]}]
356 with self.assertRaises(EngineException, msg="Accepted deletion of used role") as e:
357 self.topic.delete(self.fake_session, rid)
358 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
359 self.assertIn("role '{}' ({}) is being used by user '{}'".format(name, rid, self.test_name),
360 norm(str(e.exception)), "Wrong exception text")
361
362
363 class Test_UserTopicAuth(TestCase):
364
365 @classmethod
366 def setUpClass(cls):
367 cls.test_name = "test-user-topic"
368
369 def setUp(self):
370 self.db = Mock(dbbase.DbBase())
371 self.fs = Mock(fsbase.FsBase())
372 self.msg = Mock(msgbase.MsgBase())
373 self.auth = Mock(authconn.Authconn(None, None, None))
374 self.topic = UserTopicAuth(self.db, self.fs, self.msg, self.auth)
375 self.fake_session = {"username": test_name, "project_id": (test_pid,), "method": None,
376 "admin": True, "force": False, "public": False, "allow_show_user_project_role": True}
377
378 def test_new_user(self):
379 uid1 = str(uuid4())
380 pid = str(uuid4())
381 self.auth.get_user_list.return_value = []
382 self.auth.get_project.return_value = {"_id": pid, "name": "some_project"}
383 self.auth.create_user.return_value = {"_id": uid1, "username": self.test_name}
384 with self.subTest(i=1):
385 rollback = []
386 rid = str(uuid4())
387 self.auth.get_role.return_value = {"_id": rid, "name": "some_role"}
388 prms_in = [{"project": "some_project", "role": "some_role"}]
389 prms_out = [{"project": pid, "role": rid}]
390 uid2, oid = self.topic.new(rollback, self.fake_session, {"username": self.test_name,
391 "password": self.test_name,
392 "project_role_mappings": prms_in
393 })
394 self.assertEqual(len(rollback), 1, "Wrong rollback length")
395 self.assertEqual(uid2, uid1, "Wrong project identifier")
396 content = self.auth.create_user.call_args[0][0]
397 self.assertEqual(content["username"], self.test_name, "Wrong project name")
398 self.assertEqual(content["password"], self.test_name, "Wrong password")
399 self.assertEqual(content["project_role_mappings"], prms_out, "Wrong project-role mappings")
400 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
401 self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time")
402 with self.subTest(i=2):
403 rollback = []
404 def_rid = str(uuid4())
405 def_role = {"_id": def_rid, "name": "project_admin"}
406 self.auth.get_role.return_value = def_role
407 self.auth.get_role_list.return_value = [def_role]
408 prms_out = [{"project": pid, "role": def_rid}]
409 uid2, oid = self.topic.new(rollback, self.fake_session, {"username": self.test_name,
410 "password": self.test_name,
411 "projects": ["some_project"]
412 })
413 self.assertEqual(len(rollback), 1, "Wrong rollback length")
414 self.assertEqual(uid2, uid1, "Wrong project identifier")
415 content = self.auth.create_user.call_args[0][0]
416 self.assertEqual(content["username"], self.test_name, "Wrong project name")
417 self.assertEqual(content["password"], self.test_name, "Wrong password")
418 self.assertEqual(content["project_role_mappings"], prms_out, "Wrong project-role mappings")
419 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
420 self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time")
421 with self.subTest(i=3):
422 rollback = []
423 with self.assertRaises(EngineException, msg="Accepted wrong project-role mappings") as e:
424 self.topic.new(rollback, self.fake_session, {"username": "other-project-name",
425 "password": "other-password",
426 "project_role_mappings": [{}]
427 })
428 self.assertEqual(len(rollback), 0, "Wrong rollback length")
429 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
430 self.assertIn("format error at '{}' '{}'"
431 .format("project_role_mappings:{}", "'{}' is a required property").format(0, "project"),
432 norm(str(e.exception)), "Wrong exception text")
433 with self.subTest(i=4):
434 rollback = []
435 with self.assertRaises(EngineException, msg="Accepted wrong projects") as e:
436 self.topic.new(rollback, self.fake_session, {"username": "other-project-name",
437 "password": "other-password",
438 "projects": []
439 })
440 self.assertEqual(len(rollback), 0, "Wrong rollback length")
441 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
442 self.assertIn("format error at '{}' '{}'" .format("projects", "{} is too short").format([]),
443 norm(str(e.exception)), "Wrong exception text")
444
445 def test_edit_user(self):
446 now = time()
447 uid = str(uuid4())
448 pid1 = str(uuid4())
449 rid1 = str(uuid4())
450 prms = [{"project": pid1, "project_name": "project-1", "role": rid1, "role_name": "role-1"}]
451 user = {"_id": uid, "username": self.test_name, "project_role_mappings": prms,
452 "_admin": {"created": now, "modified": now}}
453 with self.subTest(i=1):
454 self.auth.get_user_list.side_effect = [[user], []]
455 self.auth.get_user.return_value = user
456 pid2 = str(uuid4())
457 rid2 = str(uuid4())
458 self.auth.get_project.side_effect = [{"_id": pid2, "name": "project-2"},
459 {"_id": pid1, "name": "project-1"}]
460 self.auth.get_role.side_effect = [{"_id": rid2, "name": "role-2"},
461 {"_id": rid1, "name": "role-1"}]
462 new_name = "new-user-name"
463 new_pasw = "new-password"
464 add_prms = [{"project": pid2, "role": rid2}]
465 rem_prms = [{"project": pid1, "role": rid1}]
466 self.topic.edit(self.fake_session, uid, {"username": new_name, "password": new_pasw,
467 "add_project_role_mappings": add_prms,
468 "remove_project_role_mappings": rem_prms
469 })
470 content = self.auth.update_user.call_args[0][0]
471 self.assertEqual(content["_id"], uid, "Wrong user identifier")
472 self.assertEqual(content["username"], new_name, "Wrong user name")
473 self.assertEqual(content["password"], new_pasw, "Wrong user password")
474 self.assertEqual(content["add_project_role_mappings"], add_prms, "Wrong project-role mappings to add")
475 self.assertEqual(content["remove_project_role_mappings"], prms, "Wrong project-role mappings to remove")
476 with self.subTest(i=2):
477 new_name = "other-user-name"
478 new_prms = [{}]
479 self.auth.get_role_list.side_effect = [[user], []]
480 with self.assertRaises(EngineException, msg="Accepted wrong project-role mappings") as e:
481 self.topic.edit(self.fake_session, uid, {"username": new_name, "project_role_mappings": new_prms})
482 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
483 self.assertIn("format error at '{}' '{}'"
484 .format("project_role_mappings:{}", "'{}' is a required property").format(0, "project"),
485 norm(str(e.exception)), "Wrong exception text")
486
487 def test_delete_user(self):
488 with self.subTest(i=1):
489 uid = str(uuid4())
490 self.fake_session["username"] = self.test_name
491 user = user = {"_id": uid, "username": "other-user-name", "project_role_mappings": []}
492 self.auth.get_user.return_value = user
493 self.auth.delete_user.return_value = {"deleted": 1}
494 rc = self.topic.delete(self.fake_session, uid)
495 self.assertEqual(rc, {"deleted": 1}, "Wrong user deletion return info")
496 self.assertEqual(self.auth.get_user.call_args[0][0], uid, "Wrong user identifier")
497 self.assertEqual(self.auth.delete_user.call_args[0][0], uid, "Wrong user identifier")
498
499 def test_conflict_on_new(self):
500 with self.subTest(i=1):
501 rollback = []
502 uid = str(uuid4())
503 with self.assertRaises(EngineException, msg="Accepted uuid as username") as e:
504 self.topic.new(rollback, self.fake_session, {"username": uid, "password": self.test_name,
505 "projects": [test_pid]})
506 self.assertEqual(len(rollback), 0, "Wrong rollback length")
507 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
508 self.assertIn("username '{}' cannot have a uuid format".format(uid),
509 norm(str(e.exception)), "Wrong exception text")
510 with self.subTest(i=2):
511 rollback = []
512 self.auth.get_user_list.return_value = [{"_id": str(uuid4()), "username": self.test_name}]
513 with self.assertRaises(EngineException, msg="Accepted existing username") as e:
514 self.topic.new(rollback, self.fake_session, {"username": self.test_name, "password": self.test_name,
515 "projects": [test_pid]})
516 self.assertEqual(len(rollback), 0, "Wrong rollback length")
517 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
518 self.assertIn("username '{}' is already used".format(self.test_name),
519 norm(str(e.exception)), "Wrong exception text")
520 with self.subTest(i=3):
521 rollback = []
522 self.auth.get_user_list.return_value = []
523 self.auth.get_role_list.side_effect = [[], []]
524 with self.assertRaises(AuthconnNotFoundException, msg="Accepted user without default role") as e:
525 self.topic.new(rollback, self.fake_session, {"username": self.test_name, "password": self.test_name,
526 "projects": [str(uuid4())]})
527 self.assertEqual(len(rollback), 0, "Wrong rollback length")
528 self.assertEqual(e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code")
529 self.assertIn("can't find default role for user '{}'".format(self.test_name),
530 norm(str(e.exception)), "Wrong exception text")
531
532 def test_conflict_on_edit(self):
533 uid = str(uuid4())
534 with self.subTest(i=1):
535 self.auth.get_user_list.return_value = [{"_id": uid, "username": self.test_name}]
536 new_name = str(uuid4())
537 with self.assertRaises(EngineException, msg="Accepted uuid as username") as e:
538 self.topic.edit(self.fake_session, uid, {"username": new_name})
539 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
540 self.assertIn("username '{}' cannot have an uuid format".format(new_name),
541 norm(str(e.exception)), "Wrong exception text")
542 with self.subTest(i=2):
543 self.auth.get_user_list.return_value = [{"_id": uid, "username": self.test_name}]
544 self.auth.get_role_list.side_effect = [[], []]
545 with self.assertRaises(AuthconnNotFoundException, msg="Accepted user without default role") as e:
546 self.topic.edit(self.fake_session, uid, {"projects": [str(uuid4())]})
547 self.assertEqual(e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code")
548 self.assertIn("can't find a default role for user '{}'".format(self.test_name),
549 norm(str(e.exception)), "Wrong exception text")
550 with self.subTest(i=3):
551 admin_uid = str(uuid4())
552 self.auth.get_user_list.return_value = [{"_id": admin_uid, "username": "admin"}]
553 with self.assertRaises(EngineException, msg="Accepted removing system_admin role from admin user") as e:
554 self.topic.edit(self.fake_session, admin_uid,
555 {"remove_project_role_mappings": [{"project": "admin", "role": "system_admin"}]})
556 self.assertEqual(e.exception.http_code, HTTPStatus.FORBIDDEN, "Wrong HTTP status code")
557 self.assertIn("you cannot remove system_admin role from admin user",
558 norm(str(e.exception)), "Wrong exception text")
559 with self.subTest(i=4):
560 new_name = "new-user-name"
561 self.auth.get_user_list.side_effect = [[{"_id": uid, "name": self.test_name}],
562 [{"_id": str(uuid4()), "name": new_name}]]
563 with self.assertRaises(EngineException, msg="Accepted existing username") as e:
564 self.topic.edit(self.fake_session, uid, {"username": new_name})
565 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
566 self.assertIn("username '{}' is already used".format(new_name),
567 norm(str(e.exception)), "Wrong exception text")
568
569 def test_conflict_on_del(self):
570 with self.subTest(i=1):
571 uid = str(uuid4())
572 self.fake_session["username"] = self.test_name
573 user = user = {"_id": uid, "username": self.test_name, "project_role_mappings": []}
574 self.auth.get_user.return_value = user
575 with self.assertRaises(EngineException, msg="Accepted deletion of own user") as e:
576 self.topic.delete(self.fake_session, uid)
577 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
578 self.assertIn("you cannot delete your own login user", norm(str(e.exception)), "Wrong exception text")
579
580
581 class Test_CommonVimWimSdn(TestCase):
582
583 @classmethod
584 def setUpClass(cls):
585 cls.test_name = "test-cim-topic" # CIM = Common Infrastructure Manager
586
587 def setUp(self):
588 self.db = Mock(dbbase.DbBase())
589 self.fs = Mock(fsbase.FsBase())
590 self.msg = Mock(msgbase.MsgBase())
591 self.auth = Mock(authconn.Authconn(None, None, None))
592 self.topic = CommonVimWimSdn(self.db, self.fs, self.msg, self.auth)
593 # Use WIM schemas for testing because they are the simplest
594 self.topic.topic = "wims"
595 self.topic.schema_new = validation.wim_account_new_schema
596 self.topic.schema_edit = validation.wim_account_edit_schema
597 self.fake_session = {"username": test_name, "project_id": (test_pid,), "method": None,
598 "admin": True, "force": False, "public": False, "allow_show_user_project_role": True}
599
600 def test_new_cvws(self):
601 test_url = "http://0.0.0.0:0"
602 with self.subTest(i=1):
603 rollback = []
604 test_type = "fake"
605 self.db.get_one.return_value = None
606 self.db.create.side_effect = lambda self, content: content["_id"]
607 cid, oid = self.topic.new(rollback, self.fake_session,
608 {"name": self.test_name, "wim_url": test_url, "wim_type": test_type})
609 self.assertEqual(len(rollback), 1, "Wrong rollback length")
610 args = self.db.create.call_args[0]
611 content = args[1]
612 self.assertEqual(args[0], self.topic.topic, "Wrong topic")
613 self.assertEqual(content["_id"], cid, "Wrong CIM identifier")
614 self.assertEqual(content["name"], self.test_name, "Wrong CIM name")
615 self.assertEqual(content["wim_url"], test_url, "Wrong URL")
616 self.assertEqual(content["wim_type"], test_type, "Wrong CIM type")
617 self.assertEqual(content["schema_version"], "1.11", "Wrong schema version")
618 self.assertEqual(content["op_id"], oid, "Wrong operation identifier")
619 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
620 self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time")
621 self.assertEqual(content["_admin"]["operationalState"], "PROCESSING", "Wrong operational state")
622 self.assertEqual(content["_admin"]["projects_read"], [test_pid], "Wrong read-only projects")
623 self.assertEqual(content["_admin"]["projects_write"], [test_pid], "Wrong read/write projects")
624 self.assertIsNone(content["_admin"]["current_operation"], "Wrong current operation")
625 self.assertEqual(len(content["_admin"]["operations"]), 1, "Wrong number of operations")
626 operation = content["_admin"]["operations"][0]
627 self.assertEqual(operation["lcmOperationType"], "create", "Wrong operation type")
628 self.assertEqual(operation["operationState"], "PROCESSING", "Wrong operation state")
629 self.assertGreater(operation["startTime"], content["_admin"]["created"], "Wrong operation start time")
630 self.assertGreater(operation["statusEnteredTime"], content["_admin"]["created"],
631 "Wrong operation status enter time")
632 self.assertEqual(operation["detailed-status"], "", "Wrong operation detailed status info")
633 self.assertIsNone(operation["operationParams"], "Wrong operation parameters")
634 with self.subTest(i=2):
635 rollback = []
636 test_type = "bad_type"
637 with self.assertRaises(EngineException, msg="Accepted wrong CIM type") as e:
638 self.topic.new(rollback, self.fake_session,
639 {"name": self.test_name, "wim_url": test_url, "wim_type": test_type})
640 self.assertEqual(len(rollback), 0, "Wrong rollback length")
641 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
642 self.assertIn("format error at '{}' '{}".format("wim_type", "'{}' is not one of {}").format(test_type, ""),
643 norm(str(e.exception)), "Wrong exception text")
644
645 def test_conflict_on_new(self):
646 with self.subTest(i=1):
647 rollback = []
648 test_url = "http://0.0.0.0:0"
649 test_type = "fake"
650 self.db.get_one.return_value = {"_id": str(uuid4()), "name": self.test_name}
651 with self.assertRaises(EngineException, msg="Accepted existing CIM name") as e:
652 self.topic.new(rollback, self.fake_session,
653 {"name": self.test_name, "wim_url": test_url, "wim_type": test_type})
654 self.assertEqual(len(rollback), 0, "Wrong rollback length")
655 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
656 self.assertIn("name '{}' already exists for {}".format(self.test_name, self.topic.topic),
657 norm(str(e.exception)), "Wrong exception text")
658
659 def test_edit_cvws(self):
660 now = time()
661 cid = str(uuid4())
662 test_url = "http://0.0.0.0:0"
663 test_type = "fake"
664 cvws = {"_id": cid, "name": self.test_name, "wim_url": test_url, "wim_type": test_type,
665 "_admin": {"created": now, "modified": now, "operations": [{"lcmOperationType": "create"}]}}
666 with self.subTest(i=1):
667 new_name = "new-cim-name"
668 new_url = "https://1.1.1.1:1"
669 new_type = "onos"
670 self.db.get_one.side_effect = [cvws, None]
671 self.db.replace.return_value = {"updated": 1}
672 # self.db.encrypt.side_effect = [b64str(), b64str()]
673 self.topic.edit(self.fake_session, cid, {"name": new_name, "wim_url": new_url, "wim_type": new_type})
674 args = self.db.replace.call_args[0]
675 content = args[2]
676 self.assertEqual(args[0], self.topic.topic, "Wrong topic")
677 self.assertEqual(args[1], cid, "Wrong CIM identifier")
678 self.assertEqual(content["_id"], cid, "Wrong CIM identifier")
679 self.assertEqual(content["name"], new_name, "Wrong CIM name")
680 self.assertEqual(content["wim_type"], new_type, "Wrong CIM type")
681 self.assertEqual(content["wim_url"], new_url, "Wrong URL")
682 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
683 self.assertGreater(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time")
684 self.assertEqual(len(content["_admin"]["operations"]), 2, "Wrong number of operations")
685 operation = content["_admin"]["operations"][1]
686 self.assertEqual(operation["lcmOperationType"], "edit", "Wrong operation type")
687 self.assertEqual(operation["operationState"], "PROCESSING", "Wrong operation state")
688 self.assertGreater(operation["startTime"], content["_admin"]["modified"], "Wrong operation start time")
689 self.assertGreater(operation["statusEnteredTime"], content["_admin"]["modified"],
690 "Wrong operation status enter time")
691 self.assertEqual(operation["detailed-status"], "", "Wrong operation detailed status info")
692 self.assertIsNone(operation["operationParams"], "Wrong operation parameters")
693 with self.subTest(i=2):
694 with self.assertRaises(EngineException, msg="Accepted wrong property") as e:
695 self.topic.edit(self.fake_session, str(uuid4()), {"name": "new-name", "extra_prop": "anything"})
696 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
697 self.assertIn("format error '{}'".format("additional properties are not allowed ('{}' was unexpected)").
698 format("extra_prop"),
699 norm(str(e.exception)), "Wrong exception text")
700
701 def test_conflict_on_edit(self):
702 with self.subTest(i=1):
703 cid = str(uuid4())
704 new_name = "new-cim-name"
705 self.db.get_one.side_effect = [{"_id": cid, "name": self.test_name},
706 {"_id": str(uuid4()), "name": new_name}]
707 with self.assertRaises(EngineException, msg="Accepted existing CIM name") as e:
708 self.topic.edit(self.fake_session, cid, {"name": new_name})
709 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
710 self.assertIn("name '{}' already exists for {}".format(new_name, self.topic.topic),
711 norm(str(e.exception)), "Wrong exception text")
712
713 def test_delete_cvws(self):
714 cid = str(uuid4())
715 ro_pid = str(uuid4())
716 rw_pid = str(uuid4())
717 cvws = {"_id": cid, "name": self.test_name}
718 with self.subTest(i=1):
719 cvws["_admin"] = {"projects_read": [test_pid, ro_pid, rw_pid], "projects_write": [test_pid, rw_pid]}
720 self.db.get_one.return_value = cvws
721 oid = self.topic.delete(self.fake_session, cid)
722 self.assertIsNone(oid, "Wrong operation identifier")
723 self.assertEqual(self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic")
724 self.assertEqual(self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier")
725 self.assertEqual(self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic")
726 self.assertEqual(self.db.set_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier")
727 self.assertEqual(self.db.set_one.call_args[1]["update_dict"]["_admin.projects_read"], [ro_pid, rw_pid],
728 "Wrong read-only projects update")
729 self.assertEqual(self.db.set_one.call_args[1]["update_dict"]["_admin.projects_write"], [rw_pid],
730 "Wrong read/write projects update")
731 with self.subTest(i=3):
732 now = time()
733 cvws["_admin"] = {"projects_read": [test_pid], "projects_write": [test_pid], "operations": []}
734 self.db.get_one.return_value = cvws
735 oid = self.topic.delete(self.fake_session, cid)
736 self.assertEqual(oid, cid+":0", "Wrong operation identifier")
737 self.assertEqual(self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic")
738 self.assertEqual(self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier")
739 self.assertEqual(self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic")
740 self.assertEqual(self.db.set_one.call_args[0][1]["_id"], cid, "Wrong user identifier")
741 update_dict = self.db.set_one.call_args[1]["update_dict"]
742 self.assertEqual(update_dict["_admin.projects_read"], [], "Wrong read-only projects update")
743 self.assertEqual(update_dict["_admin.projects_write"], [], "Wrong read/write projects update")
744 self.assertEqual(update_dict["_admin.to_delete"], True, "Wrong deletion mark")
745 operation = self.db.set_one.call_args[1]["push"]["_admin.operations"]
746 self.assertEqual(operation["lcmOperationType"], "delete", "Wrong operation type")
747 self.assertEqual(operation["operationState"], "PROCESSING", "Wrong operation state")
748 self.assertEqual(operation["detailed-status"], "", "Wrong operation detailed status")
749 self.assertIsNone(operation["operationParams"], "Wrong operation parameters")
750 self.assertGreater(operation["startTime"], now, "Wrong operation start time")
751 self.assertGreater(operation["statusEnteredTime"], now, "Wrong operation status enter time")
752 with self.subTest(i=3):
753 cvws["_admin"] = {"projects_read": [], "projects_write": [], "operations": []}
754 self.db.get_one.return_value = cvws
755 self.fake_session["force"] = True # to force deletion
756 oid = self.topic.delete(self.fake_session, cid)
757 self.assertIsNone(oid, "Wrong operation identifier")
758 self.assertEqual(self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic")
759 self.assertEqual(self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier")
760 self.assertEqual(self.db.del_one.call_args[0][0], self.topic.topic, "Wrong topic")
761 self.assertEqual(self.db.del_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier")
762
763
764 if __name__ == '__main__':
765 unittest.main()