d8a4e6391496c002682233f54e942e2711343a87
[osm/NBI.git] / osm_nbi / tests / test_admin_topics.py
1 #! /usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16
17 __author__ = "Pedro de la Cruz Ramos, pedro.delacruzramos@altran.com"
18 __date__ = "$2019-10-019"
19
20 import unittest
21 from unittest import TestCase
22 from unittest.mock import Mock
23 from uuid import uuid4
24 from http import HTTPStatus
25 from time import time
26 from random import randint
27 from osm_common import dbbase, fsbase, msgbase
28 from osm_nbi import authconn, validation
29 from osm_nbi.admin_topics import ProjectTopicAuth, RoleTopicAuth, UserTopicAuth, CommonVimWimSdn
30 from osm_nbi.engine import EngineException
31 from osm_nbi.authconn import AuthconnNotFoundException
32
33
34 test_pid = str(uuid4())
35 test_name = "test-user"
36
37
38 def norm(str):
39 """Normalize string for checking"""
40 return ' '.join(str.strip().split()).lower()
41
42
43 class Test_ProjectTopicAuth(TestCase):
44
45 @classmethod
46 def setUpClass(cls):
47 cls.test_name = "test-project-topic"
48
49 def setUp(self):
50 self.db = Mock(dbbase.DbBase())
51 self.fs = Mock(fsbase.FsBase())
52 self.msg = Mock(msgbase.MsgBase())
53 self.auth = Mock(authconn.Authconn(None, None, None))
54 self.topic = ProjectTopicAuth(self.db, self.fs, self.msg, self.auth)
55 self.fake_session = {"username": self.test_name, "project_id": (test_pid,), "method": None,
56 "admin": True, "force": False, "public": False, "allow_show_user_project_role": True}
57
58 def test_new_project(self):
59 with self.subTest(i=1):
60 rollback = []
61 pid1 = str(uuid4())
62 self.auth.get_project_list.return_value = []
63 self.auth.create_project.return_value = pid1
64 pid2, oid = self.topic.new(rollback, self.fake_session, {"name": self.test_name, "quotas": {}})
65 self.assertEqual(len(rollback), 1, "Wrong rollback length")
66 self.assertEqual(pid2, pid1, "Wrong project identifier")
67 content = self.auth.create_project.call_args[0][0]
68 self.assertEqual(content["name"], self.test_name, "Wrong project name")
69 self.assertEqual(content["quotas"], {}, "Wrong quotas")
70 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
71 self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time")
72 with self.subTest(i=2):
73 rollback = []
74 with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e:
75 self.topic.new(rollback, self.fake_session, {"name": "other-project-name", "quotas": {"baditems": 10}})
76 self.assertEqual(len(rollback), 0, "Wrong rollback length")
77 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
78 self.assertIn("format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'"
79 .format("baditems"), norm(str(e.exception)), "Wrong exception text")
80
81 def test_edit_project(self):
82 now = time()
83 pid = str(uuid4())
84 proj = {"_id": pid, "name": self.test_name, "_admin": {"created": now, "modified": now}}
85 with self.subTest(i=1):
86 self.auth.get_project_list.side_effect = [[proj], []]
87 new_name = "new-project-name"
88 quotas = {"vnfds": randint(0, 100), "nsds": randint(0, 100)}
89 self.topic.edit(self.fake_session, pid, {"name": new_name, "quotas": quotas})
90 _id, content = self.auth.update_project.call_args[0]
91 self.assertEqual(_id, pid, "Wrong project identifier")
92 self.assertEqual(content["_id"], pid, "Wrong project identifier")
93 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
94 self.assertGreater(content["_admin"]["modified"], now, "Wrong modification time")
95 self.assertEqual(content["name"], new_name, "Wrong project name")
96 self.assertEqual(content["quotas"], quotas, "Wrong quotas")
97 with self.subTest(i=2):
98 new_name = "other-project-name"
99 quotas = {"baditems": randint(0, 100)}
100 self.auth.get_project_list.side_effect = [[proj], []]
101 with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e:
102 self.topic.edit(self.fake_session, pid, {"name": new_name, "quotas": quotas})
103 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
104 self.assertIn("format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'"
105 .format("baditems"), norm(str(e.exception)), "Wrong exception text")
106
107 def test_conflict_on_new(self):
108 with self.subTest(i=1):
109 rollback = []
110 pid = str(uuid4())
111 with self.assertRaises(EngineException, msg="Accepted uuid as project name") as e:
112 self.topic.new(rollback, self.fake_session, {"name": pid})
113 self.assertEqual(len(rollback), 0, "Wrong rollback length")
114 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
115 self.assertIn("project name '{}' cannot have an uuid format".format(pid),
116 norm(str(e.exception)), "Wrong exception text")
117 with self.subTest(i=2):
118 rollback = []
119 self.auth.get_project_list.return_value = [{"_id": test_pid, "name": self.test_name}]
120 with self.assertRaises(EngineException, msg="Accepted existing project name") as e:
121 self.topic.new(rollback, self.fake_session, {"name": self.test_name})
122 self.assertEqual(len(rollback), 0, "Wrong rollback length")
123 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
124 self.assertIn("project '{}' exists".format(self.test_name),
125 norm(str(e.exception)), "Wrong exception text")
126
127 def test_conflict_on_edit(self):
128 with self.subTest(i=1):
129 self.auth.get_project_list.return_value = [{"_id": test_pid, "name": self.test_name}]
130 new_name = str(uuid4())
131 with self.assertRaises(EngineException, msg="Accepted uuid as project name") as e:
132 self.topic.edit(self.fake_session, test_pid, {"name": new_name})
133 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
134 self.assertIn("project name '{}' cannot have an uuid format".format(new_name),
135 norm(str(e.exception)), "Wrong exception text")
136 with self.subTest(i=2):
137 pid = str(uuid4())
138 self.auth.get_project_list.return_value = [{"_id": pid, "name": "admin"}]
139 with self.assertRaises(EngineException, msg="Accepted renaming of project 'admin'") as e:
140 self.topic.edit(self.fake_session, pid, {"name": "new-name"})
141 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
142 self.assertIn("you cannot rename project 'admin'",
143 norm(str(e.exception)), "Wrong exception text")
144 with self.subTest(i=3):
145 new_name = "new-project-name"
146 self.auth.get_project_list.side_effect = [[{"_id": test_pid, "name": self.test_name}],
147 [{"_id": str(uuid4()), "name": new_name}]]
148 with self.assertRaises(EngineException, msg="Accepted existing project name") as e:
149 self.topic.edit(self.fake_session, pid, {"name": new_name})
150 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
151 self.assertIn("project '{}' is already used".format(new_name),
152 norm(str(e.exception)), "Wrong exception text")
153
154 def test_delete_project(self):
155 with self.subTest(i=1):
156 pid = str(uuid4())
157 self.auth.get_project.return_value = {"_id": pid, "name": "other-project-name"}
158 self.auth.delete_project.return_value = {"deleted": 1}
159 self.auth.get_user_list.return_value = []
160 self.db.get_list.return_value = []
161 rc = self.topic.delete(self.fake_session, pid)
162 self.assertEqual(rc, {"deleted": 1}, "Wrong project deletion return info")
163 self.assertEqual(self.auth.get_project.call_args[0][0], pid, "Wrong project identifier")
164 self.assertEqual(self.auth.delete_project.call_args[0][0], pid, "Wrong project identifier")
165
166 def test_conflict_on_del(self):
167 with self.subTest(i=1):
168 self.auth.get_project.return_value = {"_id": test_pid, "name": self.test_name}
169 with self.assertRaises(EngineException, msg="Accepted deletion of own project") as e:
170 self.topic.delete(self.fake_session, self.test_name)
171 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
172 self.assertIn("you cannot delete your own project", norm(str(e.exception)), "Wrong exception text")
173 with self.subTest(i=2):
174 self.auth.get_project.return_value = {"_id": str(uuid4()), "name": "admin"}
175 with self.assertRaises(EngineException, msg="Accepted deletion of project 'admin'") as e:
176 self.topic.delete(self.fake_session, "admin")
177 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
178 self.assertIn("you cannot delete project 'admin'", norm(str(e.exception)), "Wrong exception text")
179 with self.subTest(i=3):
180 pid = str(uuid4())
181 name = "other-project-name"
182 self.auth.get_project.return_value = {"_id": pid, "name": name}
183 self.auth.get_user_list.return_value = [{"_id": str(uuid4()), "username": self.test_name,
184 "project_role_mappings": [{"project": pid, "role": str(uuid4())}]}]
185 with self.assertRaises(EngineException, msg="Accepted deletion of used project") as e:
186 self.topic.delete(self.fake_session, pid)
187 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
188 self.assertIn("project '{}' ({}) is being used by user '{}'".format(name, pid, self.test_name),
189 norm(str(e.exception)), "Wrong exception text")
190 with self.subTest(i=4):
191 self.auth.get_user_list.return_value = []
192 self.db.get_list.return_value = [{"_id": str(uuid4()), "id": self.test_name,
193 "_admin": {"projects_read": [pid], "projects_write": []}}]
194 with self.assertRaises(EngineException, msg="Accepted deletion of used project") as e:
195 self.topic.delete(self.fake_session, pid)
196 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
197 self.assertIn("project '{}' ({}) is being used by {} '{}'"
198 .format(name, pid, "vnf descriptor", self.test_name),
199 norm(str(e.exception)), "Wrong exception text")
200
201
202 class Test_RoleTopicAuth(TestCase):
203
204 @classmethod
205 def setUpClass(cls):
206 cls.test_name = "test-role-topic"
207 cls.test_operations = ["tokens:get"]
208
209 def setUp(self):
210 self.db = Mock(dbbase.DbBase())
211 self.fs = Mock(fsbase.FsBase())
212 self.msg = Mock(msgbase.MsgBase())
213 self.auth = Mock(authconn.Authconn(None, None, None))
214 self.auth.role_permissions = self.test_operations
215 self.topic = RoleTopicAuth(self.db, self.fs, self.msg, self.auth)
216 self.fake_session = {"username": test_name, "project_id": (test_pid,), "method": None,
217 "admin": True, "force": False, "public": False, "allow_show_user_project_role": True}
218
219 def test_new_role(self):
220 with self.subTest(i=1):
221 rollback = []
222 rid1 = str(uuid4())
223 perms_in = {"tokens": True}
224 perms_out = {"default": False, "admin": False, "tokens": True}
225 self.auth.get_role_list.return_value = []
226 self.auth.create_role.return_value = rid1
227 rid2, oid = self.topic.new(rollback, self.fake_session, {"name": self.test_name, "permissions": perms_in})
228 self.assertEqual(len(rollback), 1, "Wrong rollback length")
229 self.assertEqual(rid2, rid1, "Wrong project identifier")
230 content = self.auth.create_role.call_args[0][0]
231 self.assertEqual(content["name"], self.test_name, "Wrong role name")
232 self.assertEqual(content["permissions"], perms_out, "Wrong permissions")
233 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
234 self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time")
235 with self.subTest(i=2):
236 rollback = []
237 with self.assertRaises(EngineException, msg="Accepted wrong permissions") as e:
238 self.topic.new(rollback, self.fake_session,
239 {"name": "other-role-name", "permissions": {"projects": True}})
240 self.assertEqual(len(rollback), 0, "Wrong rollback length")
241 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
242 self.assertIn("invalid permission '{}'".format("projects"),
243 norm(str(e.exception)), "Wrong exception text")
244
245 def test_edit_role(self):
246 now = time()
247 rid = str(uuid4())
248 role = {"_id": rid, "name": self.test_name, "permissions": {"tokens": True},
249 "_admin": {"created": now, "modified": now}}
250 with self.subTest(i=1):
251 self.auth.get_role_list.side_effect = [[role], []]
252 self.auth.get_role.return_value = role
253 new_name = "new-role-name"
254 perms_in = {"tokens": False, "tokens:get": True}
255 perms_out = {"default": False, "admin": False, "tokens": False, "tokens:get": True}
256 self.topic.edit(self.fake_session, rid, {"name": new_name, "permissions": perms_in})
257 content = self.auth.update_role.call_args[0][0]
258 self.assertEqual(content["_id"], rid, "Wrong role identifier")
259 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
260 self.assertGreater(content["_admin"]["modified"], now, "Wrong modification time")
261 self.assertEqual(content["name"], new_name, "Wrong role name")
262 self.assertEqual(content["permissions"], perms_out, "Wrong permissions")
263 with self.subTest(i=2):
264 new_name = "other-role-name"
265 perms_in = {"tokens": False, "tokens:post": True}
266 self.auth.get_role_list.side_effect = [[role], []]
267 with self.assertRaises(EngineException, msg="Accepted wrong permissions") as e:
268 self.topic.edit(self.fake_session, rid, {"name": new_name, "permissions": perms_in})
269 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
270 self.assertIn("invalid permission '{}'".format("tokens:post"),
271 norm(str(e.exception)), "Wrong exception text")
272
273 def test_delete_role(self):
274 with self.subTest(i=1):
275 rid = str(uuid4())
276 role = {"_id": rid, "name": "other-role-name"}
277 self.auth.get_role_list.return_value = [role]
278 self.auth.get_role.return_value = role
279 self.auth.delete_role.return_value = {"deleted": 1}
280 self.auth.get_user_list.return_value = []
281 rc = self.topic.delete(self.fake_session, rid)
282 self.assertEqual(rc, {"deleted": 1}, "Wrong role deletion return info")
283 self.assertEqual(self.auth.get_role_list.call_args[0][0]["_id"], rid, "Wrong role identifier")
284 self.assertEqual(self.auth.get_role.call_args[0][0], rid, "Wrong role identifier")
285 self.assertEqual(self.auth.delete_role.call_args[0][0], rid, "Wrong role identifier")
286
287 def test_conflict_on_new(self):
288 with self.subTest(i=1):
289 rollback = []
290 rid = str(uuid4())
291 with self.assertRaises(EngineException, msg="Accepted uuid as role name") as e:
292 self.topic.new(rollback, self.fake_session, {"name": rid})
293 self.assertEqual(len(rollback), 0, "Wrong rollback length")
294 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
295 self.assertIn("role name '{}' cannot have an uuid format".format(rid),
296 norm(str(e.exception)), "Wrong exception text")
297 with self.subTest(i=2):
298 rollback = []
299 self.auth.get_role_list.return_value = [{"_id": str(uuid4()), "name": self.test_name}]
300 with self.assertRaises(EngineException, msg="Accepted existing role name") as e:
301 self.topic.new(rollback, self.fake_session, {"name": self.test_name})
302 self.assertEqual(len(rollback), 0, "Wrong rollback length")
303 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
304 self.assertIn("role name '{}' exists".format(self.test_name),
305 norm(str(e.exception)), "Wrong exception text")
306
307 def test_conflict_on_edit(self):
308 rid = str(uuid4())
309 with self.subTest(i=1):
310 self.auth.get_role_list.return_value = [{"_id": rid, "name": self.test_name, "permissions": {}}]
311 new_name = str(uuid4())
312 with self.assertRaises(EngineException, msg="Accepted uuid as role name") as e:
313 self.topic.edit(self.fake_session, rid, {"name": new_name})
314 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
315 self.assertIn("role name '{}' cannot have an uuid format".format(new_name),
316 norm(str(e.exception)), "Wrong exception text")
317 for i, role_name in enumerate(["system_admin", "project_admin"], start=2):
318 with self.subTest(i=i):
319 rid = str(uuid4())
320 self.auth.get_role.return_value = {"_id": rid, "name": role_name, "permissions": {}}
321 with self.assertRaises(EngineException, msg="Accepted renaming of role '{}'".format(role_name)) as e:
322 self.topic.edit(self.fake_session, rid, {"name": "new-name"})
323 self.assertEqual(e.exception.http_code, HTTPStatus.FORBIDDEN, "Wrong HTTP status code")
324 self.assertIn("you cannot rename role '{}'".format(role_name),
325 norm(str(e.exception)), "Wrong exception text")
326 with self.subTest(i=i+1):
327 new_name = "new-role-name"
328 self.auth.get_role_list.side_effect = [[{"_id": rid, "name": self.test_name, "permissions": {}}],
329 [{"_id": str(uuid4()), "name": new_name, "permissions": {}}]]
330 self.auth.get_role.return_value = {"_id": rid, "name": self.test_name, "permissions": {}}
331 with self.assertRaises(EngineException, msg="Accepted existing role name") as e:
332 self.topic.edit(self.fake_session, rid, {"name": new_name})
333 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
334 self.assertIn("role name '{}' exists".format(new_name),
335 norm(str(e.exception)), "Wrong exception text")
336
337 def test_conflict_on_del(self):
338 for i, role_name in enumerate(["system_admin", "project_admin"], start=1):
339 with self.subTest(i=i):
340 rid = str(uuid4())
341 role = {"_id": rid, "name": role_name}
342 self.auth.get_role_list.return_value = [role]
343 self.auth.get_role.return_value = role
344 with self.assertRaises(EngineException, msg="Accepted deletion of role '{}'".format(role_name)) as e:
345 self.topic.delete(self.fake_session, rid)
346 self.assertEqual(e.exception.http_code, HTTPStatus.FORBIDDEN, "Wrong HTTP status code")
347 self.assertIn("you cannot delete role '{}'".format(role_name),
348 norm(str(e.exception)), "Wrong exception text")
349 with self.subTest(i=i+1):
350 rid = str(uuid4())
351 name = "other-role-name"
352 role = {"_id": rid, "name": name}
353 self.auth.get_role_list.return_value = [role]
354 self.auth.get_role.return_value = role
355 self.auth.get_user_list.return_value = [{"_id": str(uuid4()), "username": self.test_name,
356 "project_role_mappings": [{"project": str(uuid4()), "role": rid}]}]
357 with self.assertRaises(EngineException, msg="Accepted deletion of used role") as e:
358 self.topic.delete(self.fake_session, rid)
359 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
360 self.assertIn("role '{}' ({}) is being used by user '{}'".format(name, rid, self.test_name),
361 norm(str(e.exception)), "Wrong exception text")
362
363
364 class Test_UserTopicAuth(TestCase):
365
366 @classmethod
367 def setUpClass(cls):
368 cls.test_name = "test-user-topic"
369
370 def setUp(self):
371 self.db = Mock(dbbase.DbBase())
372 self.fs = Mock(fsbase.FsBase())
373 self.msg = Mock(msgbase.MsgBase())
374 self.auth = Mock(authconn.Authconn(None, None, None))
375 self.topic = UserTopicAuth(self.db, self.fs, self.msg, self.auth)
376 self.fake_session = {"username": test_name, "project_id": (test_pid,), "method": None,
377 "admin": True, "force": False, "public": False, "allow_show_user_project_role": True}
378
379 def test_new_user(self):
380 uid1 = str(uuid4())
381 pid = str(uuid4())
382 self.auth.get_user_list.return_value = []
383 self.auth.get_project.return_value = {"_id": pid, "name": "some_project"}
384 self.auth.create_user.return_value = {"_id": uid1, "username": self.test_name}
385 with self.subTest(i=1):
386 rollback = []
387 rid = str(uuid4())
388 self.auth.get_role.return_value = {"_id": rid, "name": "some_role"}
389 prms_in = [{"project": "some_project", "role": "some_role"}]
390 prms_out = [{"project": pid, "role": rid}]
391 uid2, oid = self.topic.new(rollback, self.fake_session, {"username": self.test_name,
392 "password": self.test_name,
393 "project_role_mappings": prms_in
394 })
395 self.assertEqual(len(rollback), 1, "Wrong rollback length")
396 self.assertEqual(uid2, uid1, "Wrong project identifier")
397 content = self.auth.create_user.call_args[0][0]
398 self.assertEqual(content["username"], self.test_name, "Wrong project name")
399 self.assertEqual(content["password"], self.test_name, "Wrong password")
400 self.assertEqual(content["project_role_mappings"], prms_out, "Wrong project-role mappings")
401 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
402 self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time")
403 with self.subTest(i=2):
404 rollback = []
405 def_rid = str(uuid4())
406 def_role = {"_id": def_rid, "name": "project_admin"}
407 self.auth.get_role.return_value = def_role
408 self.auth.get_role_list.return_value = [def_role]
409 prms_out = [{"project": pid, "role": def_rid}]
410 uid2, oid = self.topic.new(rollback, self.fake_session, {"username": self.test_name,
411 "password": self.test_name,
412 "projects": ["some_project"]
413 })
414 self.assertEqual(len(rollback), 1, "Wrong rollback length")
415 self.assertEqual(uid2, uid1, "Wrong project identifier")
416 content = self.auth.create_user.call_args[0][0]
417 self.assertEqual(content["username"], self.test_name, "Wrong project name")
418 self.assertEqual(content["password"], self.test_name, "Wrong password")
419 self.assertEqual(content["project_role_mappings"], prms_out, "Wrong project-role mappings")
420 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
421 self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time")
422 with self.subTest(i=3):
423 rollback = []
424 with self.assertRaises(EngineException, msg="Accepted wrong project-role mappings") as e:
425 self.topic.new(rollback, self.fake_session, {"username": "other-project-name",
426 "password": "other-password",
427 "project_role_mappings": [{}]
428 })
429 self.assertEqual(len(rollback), 0, "Wrong rollback length")
430 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
431 self.assertIn("format error at '{}' '{}'"
432 .format("project_role_mappings:{}", "'{}' is a required property").format(0, "project"),
433 norm(str(e.exception)), "Wrong exception text")
434 with self.subTest(i=4):
435 rollback = []
436 with self.assertRaises(EngineException, msg="Accepted wrong projects") as e:
437 self.topic.new(rollback, self.fake_session, {"username": "other-project-name",
438 "password": "other-password",
439 "projects": []
440 })
441 self.assertEqual(len(rollback), 0, "Wrong rollback length")
442 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
443 self.assertIn("format error at '{}' '{}'" .format("projects", "{} is too short").format([]),
444 norm(str(e.exception)), "Wrong exception text")
445
446 def test_edit_user(self):
447 now = time()
448 uid = str(uuid4())
449 pid1 = str(uuid4())
450 rid1 = str(uuid4())
451 prms = [{"project": pid1, "project_name": "project-1", "role": rid1, "role_name": "role-1"}]
452 user = {"_id": uid, "username": self.test_name, "project_role_mappings": prms,
453 "_admin": {"created": now, "modified": now}}
454 with self.subTest(i=1):
455 self.auth.get_user_list.side_effect = [[user], []]
456 self.auth.get_user.return_value = user
457 pid2 = str(uuid4())
458 rid2 = str(uuid4())
459 self.auth.get_project.side_effect = [{"_id": pid2, "name": "project-2"},
460 {"_id": pid1, "name": "project-1"}]
461 self.auth.get_role.side_effect = [{"_id": rid2, "name": "role-2"},
462 {"_id": rid1, "name": "role-1"}]
463 new_name = "new-user-name"
464 new_pasw = "new-password"
465 add_prms = [{"project": pid2, "role": rid2}]
466 rem_prms = [{"project": pid1, "role": rid1}]
467 self.topic.edit(self.fake_session, uid, {"username": new_name, "password": new_pasw,
468 "add_project_role_mappings": add_prms,
469 "remove_project_role_mappings": rem_prms
470 })
471 content = self.auth.update_user.call_args[0][0]
472 self.assertEqual(content["_id"], uid, "Wrong user identifier")
473 self.assertEqual(content["username"], new_name, "Wrong user name")
474 self.assertEqual(content["password"], new_pasw, "Wrong user password")
475 self.assertEqual(content["add_project_role_mappings"], add_prms, "Wrong project-role mappings to add")
476 self.assertEqual(content["remove_project_role_mappings"], prms, "Wrong project-role mappings to remove")
477 with self.subTest(i=2):
478 new_name = "other-user-name"
479 new_prms = [{}]
480 self.auth.get_role_list.side_effect = [[user], []]
481 with self.assertRaises(EngineException, msg="Accepted wrong project-role mappings") as e:
482 self.topic.edit(self.fake_session, uid, {"username": new_name, "project_role_mappings": new_prms})
483 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
484 self.assertIn("format error at '{}' '{}'"
485 .format("project_role_mappings:{}", "'{}' is a required property").format(0, "project"),
486 norm(str(e.exception)), "Wrong exception text")
487
488 def test_delete_user(self):
489 with self.subTest(i=1):
490 uid = str(uuid4())
491 self.fake_session["username"] = self.test_name
492 user = user = {"_id": uid, "username": "other-user-name", "project_role_mappings": []}
493 self.auth.get_user.return_value = user
494 self.auth.delete_user.return_value = {"deleted": 1}
495 rc = self.topic.delete(self.fake_session, uid)
496 self.assertEqual(rc, {"deleted": 1}, "Wrong user deletion return info")
497 self.assertEqual(self.auth.get_user.call_args[0][0], uid, "Wrong user identifier")
498 self.assertEqual(self.auth.delete_user.call_args[0][0], uid, "Wrong user identifier")
499
500 def test_conflict_on_new(self):
501 with self.subTest(i=1):
502 rollback = []
503 uid = str(uuid4())
504 with self.assertRaises(EngineException, msg="Accepted uuid as username") as e:
505 self.topic.new(rollback, self.fake_session, {"username": uid, "password": self.test_name,
506 "projects": [test_pid]})
507 self.assertEqual(len(rollback), 0, "Wrong rollback length")
508 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
509 self.assertIn("username '{}' cannot have a uuid format".format(uid),
510 norm(str(e.exception)), "Wrong exception text")
511 with self.subTest(i=2):
512 rollback = []
513 self.auth.get_user_list.return_value = [{"_id": str(uuid4()), "username": self.test_name}]
514 with self.assertRaises(EngineException, msg="Accepted existing username") as e:
515 self.topic.new(rollback, self.fake_session, {"username": self.test_name, "password": self.test_name,
516 "projects": [test_pid]})
517 self.assertEqual(len(rollback), 0, "Wrong rollback length")
518 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
519 self.assertIn("username '{}' is already used".format(self.test_name),
520 norm(str(e.exception)), "Wrong exception text")
521 with self.subTest(i=3):
522 rollback = []
523 self.auth.get_user_list.return_value = []
524 self.auth.get_role_list.side_effect = [[], []]
525 with self.assertRaises(AuthconnNotFoundException, msg="Accepted user without default role") as e:
526 self.topic.new(rollback, self.fake_session, {"username": self.test_name, "password": self.test_name,
527 "projects": [str(uuid4())]})
528 self.assertEqual(len(rollback), 0, "Wrong rollback length")
529 self.assertEqual(e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code")
530 self.assertIn("can't find default role for user '{}'".format(self.test_name),
531 norm(str(e.exception)), "Wrong exception text")
532
533 def test_conflict_on_edit(self):
534 uid = str(uuid4())
535 with self.subTest(i=1):
536 self.auth.get_user_list.return_value = [{"_id": uid, "username": self.test_name}]
537 new_name = str(uuid4())
538 with self.assertRaises(EngineException, msg="Accepted uuid as username") as e:
539 self.topic.edit(self.fake_session, uid, {"username": new_name})
540 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
541 self.assertIn("username '{}' cannot have an uuid format".format(new_name),
542 norm(str(e.exception)), "Wrong exception text")
543 with self.subTest(i=2):
544 self.auth.get_user_list.return_value = [{"_id": uid, "username": self.test_name}]
545 self.auth.get_role_list.side_effect = [[], []]
546 with self.assertRaises(AuthconnNotFoundException, msg="Accepted user without default role") as e:
547 self.topic.edit(self.fake_session, uid, {"projects": [str(uuid4())]})
548 self.assertEqual(e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code")
549 self.assertIn("can't find a default role for user '{}'".format(self.test_name),
550 norm(str(e.exception)), "Wrong exception text")
551 with self.subTest(i=3):
552 admin_uid = str(uuid4())
553 self.auth.get_user_list.return_value = [{"_id": admin_uid, "username": "admin"}]
554 with self.assertRaises(EngineException, msg="Accepted removing system_admin role from admin user") as e:
555 self.topic.edit(self.fake_session, admin_uid,
556 {"remove_project_role_mappings": [{"project": "admin", "role": "system_admin"}]})
557 self.assertEqual(e.exception.http_code, HTTPStatus.FORBIDDEN, "Wrong HTTP status code")
558 self.assertIn("you cannot remove system_admin role from admin user",
559 norm(str(e.exception)), "Wrong exception text")
560 with self.subTest(i=4):
561 new_name = "new-user-name"
562 self.auth.get_user_list.side_effect = [[{"_id": uid, "name": self.test_name}],
563 [{"_id": str(uuid4()), "name": new_name}]]
564 with self.assertRaises(EngineException, msg="Accepted existing username") as e:
565 self.topic.edit(self.fake_session, uid, {"username": new_name})
566 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
567 self.assertIn("username '{}' is already used".format(new_name),
568 norm(str(e.exception)), "Wrong exception text")
569
570 def test_conflict_on_del(self):
571 with self.subTest(i=1):
572 uid = str(uuid4())
573 self.fake_session["username"] = self.test_name
574 user = user = {"_id": uid, "username": self.test_name, "project_role_mappings": []}
575 self.auth.get_user.return_value = user
576 with self.assertRaises(EngineException, msg="Accepted deletion of own user") as e:
577 self.topic.delete(self.fake_session, uid)
578 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
579 self.assertIn("you cannot delete your own login user", norm(str(e.exception)), "Wrong exception text")
580
581
582 class Test_CommonVimWimSdn(TestCase):
583
584 @classmethod
585 def setUpClass(cls):
586 cls.test_name = "test-cim-topic" # CIM = Common Infrastructure Manager
587
588 def setUp(self):
589 self.db = Mock(dbbase.DbBase())
590 self.fs = Mock(fsbase.FsBase())
591 self.msg = Mock(msgbase.MsgBase())
592 self.auth = Mock(authconn.Authconn(None, None, None))
593 self.topic = CommonVimWimSdn(self.db, self.fs, self.msg, self.auth)
594 # Use WIM schemas for testing because they are the simplest
595 self.topic._send_msg = Mock()
596 self.topic.topic = "wims"
597 self.topic.schema_new = validation.wim_account_new_schema
598 self.topic.schema_edit = validation.wim_account_edit_schema
599 self.fake_session = {"username": test_name, "project_id": (test_pid,), "method": None,
600 "admin": True, "force": False, "public": False, "allow_show_user_project_role": True}
601
602 def test_new_cvws(self):
603 test_url = "http://0.0.0.0:0"
604 with self.subTest(i=1):
605 rollback = []
606 test_type = "fake"
607 self.db.get_one.return_value = None
608 self.db.create.side_effect = lambda self, content: content["_id"]
609 cid, oid = self.topic.new(rollback, self.fake_session,
610 {"name": self.test_name, "wim_url": test_url, "wim_type": test_type})
611 self.assertEqual(len(rollback), 1, "Wrong rollback length")
612 args = self.db.create.call_args[0]
613 content = args[1]
614 self.assertEqual(args[0], self.topic.topic, "Wrong topic")
615 self.assertEqual(content["_id"], cid, "Wrong CIM identifier")
616 self.assertEqual(content["name"], self.test_name, "Wrong CIM name")
617 self.assertEqual(content["wim_url"], test_url, "Wrong URL")
618 self.assertEqual(content["wim_type"], test_type, "Wrong CIM type")
619 self.assertEqual(content["schema_version"], "1.11", "Wrong schema version")
620 self.assertEqual(content["op_id"], oid, "Wrong operation identifier")
621 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
622 self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time")
623 self.assertEqual(content["_admin"]["operationalState"], "PROCESSING", "Wrong operational state")
624 self.assertEqual(content["_admin"]["projects_read"], [test_pid], "Wrong read-only projects")
625 self.assertEqual(content["_admin"]["projects_write"], [test_pid], "Wrong read/write projects")
626 self.assertIsNone(content["_admin"]["current_operation"], "Wrong current operation")
627 self.assertEqual(len(content["_admin"]["operations"]), 1, "Wrong number of operations")
628 operation = content["_admin"]["operations"][0]
629 self.assertEqual(operation["lcmOperationType"], "create", "Wrong operation type")
630 self.assertEqual(operation["operationState"], "PROCESSING", "Wrong operation state")
631 self.assertGreater(operation["startTime"], content["_admin"]["created"], "Wrong operation start time")
632 self.assertGreater(operation["statusEnteredTime"], content["_admin"]["created"],
633 "Wrong operation status enter time")
634 self.assertEqual(operation["detailed-status"], "", "Wrong operation detailed status info")
635 self.assertIsNone(operation["operationParams"], "Wrong operation parameters")
636 # This test is disabled. From Feature 8030 we admit all WIM/SDN types
637 # with self.subTest(i=2):
638 # rollback = []
639 # test_type = "bad_type"
640 # with self.assertRaises(EngineException, msg="Accepted wrong CIM type") as e:
641 # self.topic.new(rollback, self.fake_session,
642 # {"name": self.test_name, "wim_url": test_url, "wim_type": test_type})
643 # self.assertEqual(len(rollback), 0, "Wrong rollback length")
644 # self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
645 # self.assertIn("format error at '{}' '{}".format("wim_type", "'{}' is not one of {}").format(test_type,""),
646 # norm(str(e.exception)), "Wrong exception text")
647
648 def test_conflict_on_new(self):
649 with self.subTest(i=1):
650 rollback = []
651 test_url = "http://0.0.0.0:0"
652 test_type = "fake"
653 self.db.get_one.return_value = {"_id": str(uuid4()), "name": self.test_name}
654 with self.assertRaises(EngineException, msg="Accepted existing CIM name") as e:
655 self.topic.new(rollback, self.fake_session,
656 {"name": self.test_name, "wim_url": test_url, "wim_type": test_type})
657 self.assertEqual(len(rollback), 0, "Wrong rollback length")
658 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
659 self.assertIn("name '{}' already exists for {}".format(self.test_name, self.topic.topic),
660 norm(str(e.exception)), "Wrong exception text")
661
662 def test_edit_cvws(self):
663 now = time()
664 cid = str(uuid4())
665 test_url = "http://0.0.0.0:0"
666 test_type = "fake"
667 cvws = {"_id": cid, "name": self.test_name, "wim_url": test_url, "wim_type": test_type,
668 "_admin": {"created": now, "modified": now, "operations": [{"lcmOperationType": "create"}]}}
669 with self.subTest(i=1):
670 new_name = "new-cim-name"
671 new_url = "https://1.1.1.1:1"
672 new_type = "onos"
673 self.db.get_one.side_effect = [cvws, None]
674 self.db.replace.return_value = {"updated": 1}
675 # self.db.encrypt.side_effect = [b64str(), b64str()]
676 self.topic.edit(self.fake_session, cid, {"name": new_name, "wim_url": new_url, "wim_type": new_type})
677 args = self.db.replace.call_args[0]
678 content = args[2]
679 self.assertEqual(args[0], self.topic.topic, "Wrong topic")
680 self.assertEqual(args[1], cid, "Wrong CIM identifier")
681 self.assertEqual(content["_id"], cid, "Wrong CIM identifier")
682 self.assertEqual(content["name"], new_name, "Wrong CIM name")
683 self.assertEqual(content["wim_type"], new_type, "Wrong CIM type")
684 self.assertEqual(content["wim_url"], new_url, "Wrong URL")
685 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
686 self.assertGreater(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time")
687 self.assertEqual(len(content["_admin"]["operations"]), 2, "Wrong number of operations")
688 operation = content["_admin"]["operations"][1]
689 self.assertEqual(operation["lcmOperationType"], "edit", "Wrong operation type")
690 self.assertEqual(operation["operationState"], "PROCESSING", "Wrong operation state")
691 self.assertGreater(operation["startTime"], content["_admin"]["modified"], "Wrong operation start time")
692 self.assertGreater(operation["statusEnteredTime"], content["_admin"]["modified"],
693 "Wrong operation status enter time")
694 self.assertEqual(operation["detailed-status"], "", "Wrong operation detailed status info")
695 self.assertIsNone(operation["operationParams"], "Wrong operation parameters")
696 with self.subTest(i=2):
697 with self.assertRaises(EngineException, msg="Accepted wrong property") as e:
698 self.topic.edit(self.fake_session, str(uuid4()), {"name": "new-name", "extra_prop": "anything"})
699 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
700 self.assertIn("format error '{}'".format("additional properties are not allowed ('{}' was unexpected)").
701 format("extra_prop"),
702 norm(str(e.exception)), "Wrong exception text")
703
704 def test_conflict_on_edit(self):
705 with self.subTest(i=1):
706 cid = str(uuid4())
707 new_name = "new-cim-name"
708 self.db.get_one.side_effect = [{"_id": cid, "name": self.test_name},
709 {"_id": str(uuid4()), "name": new_name}]
710 with self.assertRaises(EngineException, msg="Accepted existing CIM name") as e:
711 self.topic.edit(self.fake_session, cid, {"name": new_name})
712 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
713 self.assertIn("name '{}' already exists for {}".format(new_name, self.topic.topic),
714 norm(str(e.exception)), "Wrong exception text")
715
716 def test_delete_cvws(self):
717 cid = str(uuid4())
718 ro_pid = str(uuid4())
719 rw_pid = str(uuid4())
720 cvws = {"_id": cid, "name": self.test_name}
721 self.db.get_list.return_value = []
722 with self.subTest(i=1):
723 cvws["_admin"] = {"projects_read": [test_pid, ro_pid, rw_pid], "projects_write": [test_pid, rw_pid]}
724 self.db.get_one.return_value = cvws
725 oid = self.topic.delete(self.fake_session, cid)
726 self.assertIsNone(oid, "Wrong operation identifier")
727 self.assertEqual(self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic")
728 self.assertEqual(self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier")
729 self.assertEqual(self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic")
730 self.assertEqual(self.db.set_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier")
731 self.assertEqual(self.db.set_one.call_args[1]["update_dict"], None,
732 "Wrong read-only projects update")
733 self.assertEqual(self.db.set_one.call_args[1]["pull"], {"_admin.projects_read." + test_pid: None,
734 "_admin.projects_write." + test_pid: None},
735 "Wrong read/write projects update")
736 self.topic._send_msg.assert_not_called()
737 with self.subTest(i=2):
738 now = time()
739 cvws["_admin"] = {"projects_read": [test_pid], "projects_write": [test_pid], "operations": []}
740 self.db.get_one.return_value = cvws
741 oid = self.topic.delete(self.fake_session, cid)
742 self.assertEqual(oid, cid+":0", "Wrong operation identifier")
743 self.assertEqual(self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic")
744 self.assertEqual(self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier")
745 self.assertEqual(self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic")
746 self.assertEqual(self.db.set_one.call_args[0][1]["_id"], cid, "Wrong user identifier")
747 self.assertEqual(self.db.set_one.call_args[1]["update_dict"], {"_admin.to_delete": True},
748 "Wrong _admin.to_delete update")
749 operation = self.db.set_one.call_args[1]["push"]["_admin.operations"]
750 self.assertEqual(operation["lcmOperationType"], "delete", "Wrong operation type")
751 self.assertEqual(operation["operationState"], "PROCESSING", "Wrong operation state")
752 self.assertEqual(operation["detailed-status"], "", "Wrong operation detailed status")
753 self.assertIsNone(operation["operationParams"], "Wrong operation parameters")
754 self.assertGreater(operation["startTime"], now, "Wrong operation start time")
755 self.assertGreater(operation["statusEnteredTime"], now, "Wrong operation status enter time")
756 self.topic._send_msg.assert_called_once_with("delete", {"_id": cid, "op_id": cid + ":0"}, not_send_msg=None)
757 with self.subTest(i=3):
758 cvws["_admin"] = {"projects_read": [], "projects_write": [], "operations": []}
759 self.db.get_one.return_value = cvws
760 self.topic._send_msg.reset_mock()
761 self.db.get_one.reset_mock()
762 self.db.del_one.reset_mock()
763 self.fake_session["force"] = True # to force deletion
764 self.fake_session["admin"] = True # to force deletion
765 self.fake_session["project_id"] = [] # to force deletion
766 oid = self.topic.delete(self.fake_session, cid)
767 self.assertIsNone(oid, "Wrong operation identifier")
768 self.assertEqual(self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic")
769 self.assertEqual(self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier")
770 self.assertEqual(self.db.del_one.call_args[0][0], self.topic.topic, "Wrong topic")
771 self.assertEqual(self.db.del_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier")
772 self.topic._send_msg.assert_called_once_with("deleted", {"_id": cid, "op_id": None}, not_send_msg=None)
773
774
775 if __name__ == '__main__':
776 unittest.main()