blob: d8a4e6391496c002682233f54e942e2711343a87 [file] [log] [blame]
delacruzramo79e40f42019-10-10 16:36:40 +02001#! /usr/bin/python3
2# -*- coding: utf-8 -*-
3
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13# implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17__author__ = "Pedro de la Cruz Ramos, pedro.delacruzramos@altran.com"
18__date__ = "$2019-10-019"
19
20import unittest
21from unittest import TestCase
22from unittest.mock import Mock
23from uuid import uuid4
24from http import HTTPStatus
25from time import time
26from random import randint
27from osm_common import dbbase, fsbase, msgbase
28from osm_nbi import authconn, validation
29from osm_nbi.admin_topics import ProjectTopicAuth, RoleTopicAuth, UserTopicAuth, CommonVimWimSdn
30from osm_nbi.engine import EngineException
31from osm_nbi.authconn import AuthconnNotFoundException
32
33
34test_pid = str(uuid4())
35test_name = "test-user"
36
37
38def norm(str):
39 """Normalize string for checking"""
40 return ' '.join(str.strip().split()).lower()
41
42
43class Test_ProjectTopicAuth(TestCase):
44
45 @classmethod
46 def setUpClass(cls):
47 cls.test_name = "test-project-topic"
48
49 def setUp(self):
50 self.db = Mock(dbbase.DbBase())
51 self.fs = Mock(fsbase.FsBase())
52 self.msg = Mock(msgbase.MsgBase())
tierno9e87a7f2020-03-23 09:24:10 +000053 self.auth = Mock(authconn.Authconn(None, None, None))
delacruzramo79e40f42019-10-10 16:36:40 +020054 self.topic = ProjectTopicAuth(self.db, self.fs, self.msg, self.auth)
55 self.fake_session = {"username": self.test_name, "project_id": (test_pid,), "method": None,
56 "admin": True, "force": False, "public": False, "allow_show_user_project_role": True}
57
58 def test_new_project(self):
59 with self.subTest(i=1):
60 rollback = []
61 pid1 = str(uuid4())
62 self.auth.get_project_list.return_value = []
63 self.auth.create_project.return_value = pid1
64 pid2, oid = self.topic.new(rollback, self.fake_session, {"name": self.test_name, "quotas": {}})
65 self.assertEqual(len(rollback), 1, "Wrong rollback length")
66 self.assertEqual(pid2, pid1, "Wrong project identifier")
67 content = self.auth.create_project.call_args[0][0]
68 self.assertEqual(content["name"], self.test_name, "Wrong project name")
69 self.assertEqual(content["quotas"], {}, "Wrong quotas")
70 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
71 self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time")
72 with self.subTest(i=2):
73 rollback = []
74 with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e:
75 self.topic.new(rollback, self.fake_session, {"name": "other-project-name", "quotas": {"baditems": 10}})
76 self.assertEqual(len(rollback), 0, "Wrong rollback length")
77 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
78 self.assertIn("format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'"
79 .format("baditems"), norm(str(e.exception)), "Wrong exception text")
80
81 def test_edit_project(self):
82 now = time()
83 pid = str(uuid4())
84 proj = {"_id": pid, "name": self.test_name, "_admin": {"created": now, "modified": now}}
85 with self.subTest(i=1):
86 self.auth.get_project_list.side_effect = [[proj], []]
87 new_name = "new-project-name"
88 quotas = {"vnfds": randint(0, 100), "nsds": randint(0, 100)}
89 self.topic.edit(self.fake_session, pid, {"name": new_name, "quotas": quotas})
90 _id, content = self.auth.update_project.call_args[0]
91 self.assertEqual(_id, pid, "Wrong project identifier")
92 self.assertEqual(content["_id"], pid, "Wrong project identifier")
93 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
94 self.assertGreater(content["_admin"]["modified"], now, "Wrong modification time")
95 self.assertEqual(content["name"], new_name, "Wrong project name")
96 self.assertEqual(content["quotas"], quotas, "Wrong quotas")
97 with self.subTest(i=2):
98 new_name = "other-project-name"
99 quotas = {"baditems": randint(0, 100)}
100 self.auth.get_project_list.side_effect = [[proj], []]
101 with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e:
102 self.topic.edit(self.fake_session, pid, {"name": new_name, "quotas": quotas})
103 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
104 self.assertIn("format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'"
105 .format("baditems"), norm(str(e.exception)), "Wrong exception text")
106
107 def test_conflict_on_new(self):
108 with self.subTest(i=1):
109 rollback = []
110 pid = str(uuid4())
111 with self.assertRaises(EngineException, msg="Accepted uuid as project name") as e:
112 self.topic.new(rollback, self.fake_session, {"name": pid})
113 self.assertEqual(len(rollback), 0, "Wrong rollback length")
114 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
115 self.assertIn("project name '{}' cannot have an uuid format".format(pid),
116 norm(str(e.exception)), "Wrong exception text")
117 with self.subTest(i=2):
118 rollback = []
119 self.auth.get_project_list.return_value = [{"_id": test_pid, "name": self.test_name}]
120 with self.assertRaises(EngineException, msg="Accepted existing project name") as e:
121 self.topic.new(rollback, self.fake_session, {"name": self.test_name})
122 self.assertEqual(len(rollback), 0, "Wrong rollback length")
123 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
124 self.assertIn("project '{}' exists".format(self.test_name),
125 norm(str(e.exception)), "Wrong exception text")
126
127 def test_conflict_on_edit(self):
128 with self.subTest(i=1):
129 self.auth.get_project_list.return_value = [{"_id": test_pid, "name": self.test_name}]
130 new_name = str(uuid4())
131 with self.assertRaises(EngineException, msg="Accepted uuid as project name") as e:
132 self.topic.edit(self.fake_session, test_pid, {"name": new_name})
133 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
134 self.assertIn("project name '{}' cannot have an uuid format".format(new_name),
135 norm(str(e.exception)), "Wrong exception text")
136 with self.subTest(i=2):
137 pid = str(uuid4())
138 self.auth.get_project_list.return_value = [{"_id": pid, "name": "admin"}]
139 with self.assertRaises(EngineException, msg="Accepted renaming of project 'admin'") as e:
140 self.topic.edit(self.fake_session, pid, {"name": "new-name"})
141 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
142 self.assertIn("you cannot rename project 'admin'",
143 norm(str(e.exception)), "Wrong exception text")
144 with self.subTest(i=3):
145 new_name = "new-project-name"
146 self.auth.get_project_list.side_effect = [[{"_id": test_pid, "name": self.test_name}],
147 [{"_id": str(uuid4()), "name": new_name}]]
148 with self.assertRaises(EngineException, msg="Accepted existing project name") as e:
149 self.topic.edit(self.fake_session, pid, {"name": new_name})
150 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
151 self.assertIn("project '{}' is already used".format(new_name),
152 norm(str(e.exception)), "Wrong exception text")
153
154 def test_delete_project(self):
155 with self.subTest(i=1):
156 pid = str(uuid4())
157 self.auth.get_project.return_value = {"_id": pid, "name": "other-project-name"}
158 self.auth.delete_project.return_value = {"deleted": 1}
159 self.auth.get_user_list.return_value = []
160 self.db.get_list.return_value = []
161 rc = self.topic.delete(self.fake_session, pid)
162 self.assertEqual(rc, {"deleted": 1}, "Wrong project deletion return info")
163 self.assertEqual(self.auth.get_project.call_args[0][0], pid, "Wrong project identifier")
164 self.assertEqual(self.auth.delete_project.call_args[0][0], pid, "Wrong project identifier")
165
166 def test_conflict_on_del(self):
167 with self.subTest(i=1):
168 self.auth.get_project.return_value = {"_id": test_pid, "name": self.test_name}
169 with self.assertRaises(EngineException, msg="Accepted deletion of own project") as e:
170 self.topic.delete(self.fake_session, self.test_name)
171 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
172 self.assertIn("you cannot delete your own project", norm(str(e.exception)), "Wrong exception text")
173 with self.subTest(i=2):
174 self.auth.get_project.return_value = {"_id": str(uuid4()), "name": "admin"}
175 with self.assertRaises(EngineException, msg="Accepted deletion of project 'admin'") as e:
176 self.topic.delete(self.fake_session, "admin")
177 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
178 self.assertIn("you cannot delete project 'admin'", norm(str(e.exception)), "Wrong exception text")
179 with self.subTest(i=3):
180 pid = str(uuid4())
181 name = "other-project-name"
182 self.auth.get_project.return_value = {"_id": pid, "name": name}
183 self.auth.get_user_list.return_value = [{"_id": str(uuid4()), "username": self.test_name,
184 "project_role_mappings": [{"project": pid, "role": str(uuid4())}]}]
185 with self.assertRaises(EngineException, msg="Accepted deletion of used project") as e:
186 self.topic.delete(self.fake_session, pid)
187 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
188 self.assertIn("project '{}' ({}) is being used by user '{}'".format(name, pid, self.test_name),
189 norm(str(e.exception)), "Wrong exception text")
190 with self.subTest(i=4):
191 self.auth.get_user_list.return_value = []
192 self.db.get_list.return_value = [{"_id": str(uuid4()), "id": self.test_name,
193 "_admin": {"projects_read": [pid], "projects_write": []}}]
194 with self.assertRaises(EngineException, msg="Accepted deletion of used project") as e:
195 self.topic.delete(self.fake_session, pid)
196 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
197 self.assertIn("project '{}' ({}) is being used by {} '{}'"
198 .format(name, pid, "vnf descriptor", self.test_name),
199 norm(str(e.exception)), "Wrong exception text")
200
201
202class Test_RoleTopicAuth(TestCase):
203
204 @classmethod
205 def setUpClass(cls):
206 cls.test_name = "test-role-topic"
207 cls.test_operations = ["tokens:get"]
208
209 def setUp(self):
210 self.db = Mock(dbbase.DbBase())
211 self.fs = Mock(fsbase.FsBase())
212 self.msg = Mock(msgbase.MsgBase())
tierno9e87a7f2020-03-23 09:24:10 +0000213 self.auth = Mock(authconn.Authconn(None, None, None))
214 self.auth.role_permissions = self.test_operations
215 self.topic = RoleTopicAuth(self.db, self.fs, self.msg, self.auth)
delacruzramo79e40f42019-10-10 16:36:40 +0200216 self.fake_session = {"username": test_name, "project_id": (test_pid,), "method": None,
217 "admin": True, "force": False, "public": False, "allow_show_user_project_role": True}
218
219 def test_new_role(self):
220 with self.subTest(i=1):
221 rollback = []
222 rid1 = str(uuid4())
223 perms_in = {"tokens": True}
224 perms_out = {"default": False, "admin": False, "tokens": True}
225 self.auth.get_role_list.return_value = []
226 self.auth.create_role.return_value = rid1
227 rid2, oid = self.topic.new(rollback, self.fake_session, {"name": self.test_name, "permissions": perms_in})
228 self.assertEqual(len(rollback), 1, "Wrong rollback length")
229 self.assertEqual(rid2, rid1, "Wrong project identifier")
230 content = self.auth.create_role.call_args[0][0]
231 self.assertEqual(content["name"], self.test_name, "Wrong role name")
232 self.assertEqual(content["permissions"], perms_out, "Wrong permissions")
233 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
234 self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time")
235 with self.subTest(i=2):
236 rollback = []
237 with self.assertRaises(EngineException, msg="Accepted wrong permissions") as e:
238 self.topic.new(rollback, self.fake_session,
239 {"name": "other-role-name", "permissions": {"projects": True}})
240 self.assertEqual(len(rollback), 0, "Wrong rollback length")
241 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
242 self.assertIn("invalid permission '{}'".format("projects"),
243 norm(str(e.exception)), "Wrong exception text")
244
245 def test_edit_role(self):
246 now = time()
247 rid = str(uuid4())
248 role = {"_id": rid, "name": self.test_name, "permissions": {"tokens": True},
249 "_admin": {"created": now, "modified": now}}
250 with self.subTest(i=1):
251 self.auth.get_role_list.side_effect = [[role], []]
252 self.auth.get_role.return_value = role
253 new_name = "new-role-name"
254 perms_in = {"tokens": False, "tokens:get": True}
255 perms_out = {"default": False, "admin": False, "tokens": False, "tokens:get": True}
256 self.topic.edit(self.fake_session, rid, {"name": new_name, "permissions": perms_in})
257 content = self.auth.update_role.call_args[0][0]
258 self.assertEqual(content["_id"], rid, "Wrong role identifier")
259 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
260 self.assertGreater(content["_admin"]["modified"], now, "Wrong modification time")
261 self.assertEqual(content["name"], new_name, "Wrong role name")
262 self.assertEqual(content["permissions"], perms_out, "Wrong permissions")
263 with self.subTest(i=2):
264 new_name = "other-role-name"
265 perms_in = {"tokens": False, "tokens:post": True}
266 self.auth.get_role_list.side_effect = [[role], []]
267 with self.assertRaises(EngineException, msg="Accepted wrong permissions") as e:
268 self.topic.edit(self.fake_session, rid, {"name": new_name, "permissions": perms_in})
269 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
270 self.assertIn("invalid permission '{}'".format("tokens:post"),
271 norm(str(e.exception)), "Wrong exception text")
272
273 def test_delete_role(self):
274 with self.subTest(i=1):
275 rid = str(uuid4())
276 role = {"_id": rid, "name": "other-role-name"}
277 self.auth.get_role_list.return_value = [role]
278 self.auth.get_role.return_value = role
279 self.auth.delete_role.return_value = {"deleted": 1}
280 self.auth.get_user_list.return_value = []
281 rc = self.topic.delete(self.fake_session, rid)
282 self.assertEqual(rc, {"deleted": 1}, "Wrong role deletion return info")
283 self.assertEqual(self.auth.get_role_list.call_args[0][0]["_id"], rid, "Wrong role identifier")
284 self.assertEqual(self.auth.get_role.call_args[0][0], rid, "Wrong role identifier")
285 self.assertEqual(self.auth.delete_role.call_args[0][0], rid, "Wrong role identifier")
286
287 def test_conflict_on_new(self):
288 with self.subTest(i=1):
289 rollback = []
290 rid = str(uuid4())
291 with self.assertRaises(EngineException, msg="Accepted uuid as role name") as e:
292 self.topic.new(rollback, self.fake_session, {"name": rid})
293 self.assertEqual(len(rollback), 0, "Wrong rollback length")
294 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
295 self.assertIn("role name '{}' cannot have an uuid format".format(rid),
296 norm(str(e.exception)), "Wrong exception text")
297 with self.subTest(i=2):
298 rollback = []
299 self.auth.get_role_list.return_value = [{"_id": str(uuid4()), "name": self.test_name}]
300 with self.assertRaises(EngineException, msg="Accepted existing role name") as e:
301 self.topic.new(rollback, self.fake_session, {"name": self.test_name})
302 self.assertEqual(len(rollback), 0, "Wrong rollback length")
303 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
304 self.assertIn("role name '{}' exists".format(self.test_name),
305 norm(str(e.exception)), "Wrong exception text")
306
307 def test_conflict_on_edit(self):
308 rid = str(uuid4())
309 with self.subTest(i=1):
310 self.auth.get_role_list.return_value = [{"_id": rid, "name": self.test_name, "permissions": {}}]
311 new_name = str(uuid4())
312 with self.assertRaises(EngineException, msg="Accepted uuid as role name") as e:
313 self.topic.edit(self.fake_session, rid, {"name": new_name})
314 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
315 self.assertIn("role name '{}' cannot have an uuid format".format(new_name),
316 norm(str(e.exception)), "Wrong exception text")
317 for i, role_name in enumerate(["system_admin", "project_admin"], start=2):
318 with self.subTest(i=i):
319 rid = str(uuid4())
320 self.auth.get_role.return_value = {"_id": rid, "name": role_name, "permissions": {}}
321 with self.assertRaises(EngineException, msg="Accepted renaming of role '{}'".format(role_name)) as e:
322 self.topic.edit(self.fake_session, rid, {"name": "new-name"})
323 self.assertEqual(e.exception.http_code, HTTPStatus.FORBIDDEN, "Wrong HTTP status code")
324 self.assertIn("you cannot rename role '{}'".format(role_name),
325 norm(str(e.exception)), "Wrong exception text")
326 with self.subTest(i=i+1):
327 new_name = "new-role-name"
328 self.auth.get_role_list.side_effect = [[{"_id": rid, "name": self.test_name, "permissions": {}}],
329 [{"_id": str(uuid4()), "name": new_name, "permissions": {}}]]
330 self.auth.get_role.return_value = {"_id": rid, "name": self.test_name, "permissions": {}}
331 with self.assertRaises(EngineException, msg="Accepted existing role name") as e:
332 self.topic.edit(self.fake_session, rid, {"name": new_name})
333 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
334 self.assertIn("role name '{}' exists".format(new_name),
335 norm(str(e.exception)), "Wrong exception text")
336
337 def test_conflict_on_del(self):
338 for i, role_name in enumerate(["system_admin", "project_admin"], start=1):
339 with self.subTest(i=i):
340 rid = str(uuid4())
341 role = {"_id": rid, "name": role_name}
342 self.auth.get_role_list.return_value = [role]
343 self.auth.get_role.return_value = role
344 with self.assertRaises(EngineException, msg="Accepted deletion of role '{}'".format(role_name)) as e:
345 self.topic.delete(self.fake_session, rid)
346 self.assertEqual(e.exception.http_code, HTTPStatus.FORBIDDEN, "Wrong HTTP status code")
347 self.assertIn("you cannot delete role '{}'".format(role_name),
348 norm(str(e.exception)), "Wrong exception text")
349 with self.subTest(i=i+1):
350 rid = str(uuid4())
351 name = "other-role-name"
352 role = {"_id": rid, "name": name}
353 self.auth.get_role_list.return_value = [role]
354 self.auth.get_role.return_value = role
355 self.auth.get_user_list.return_value = [{"_id": str(uuid4()), "username": self.test_name,
356 "project_role_mappings": [{"project": str(uuid4()), "role": rid}]}]
357 with self.assertRaises(EngineException, msg="Accepted deletion of used role") as e:
358 self.topic.delete(self.fake_session, rid)
359 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
360 self.assertIn("role '{}' ({}) is being used by user '{}'".format(name, rid, self.test_name),
361 norm(str(e.exception)), "Wrong exception text")
362
363
364class Test_UserTopicAuth(TestCase):
365
366 @classmethod
367 def setUpClass(cls):
368 cls.test_name = "test-user-topic"
369
370 def setUp(self):
371 self.db = Mock(dbbase.DbBase())
372 self.fs = Mock(fsbase.FsBase())
373 self.msg = Mock(msgbase.MsgBase())
tierno9e87a7f2020-03-23 09:24:10 +0000374 self.auth = Mock(authconn.Authconn(None, None, None))
delacruzramo79e40f42019-10-10 16:36:40 +0200375 self.topic = UserTopicAuth(self.db, self.fs, self.msg, self.auth)
376 self.fake_session = {"username": test_name, "project_id": (test_pid,), "method": None,
377 "admin": True, "force": False, "public": False, "allow_show_user_project_role": True}
378
379 def test_new_user(self):
380 uid1 = str(uuid4())
381 pid = str(uuid4())
382 self.auth.get_user_list.return_value = []
383 self.auth.get_project.return_value = {"_id": pid, "name": "some_project"}
384 self.auth.create_user.return_value = {"_id": uid1, "username": self.test_name}
385 with self.subTest(i=1):
386 rollback = []
387 rid = str(uuid4())
388 self.auth.get_role.return_value = {"_id": rid, "name": "some_role"}
389 prms_in = [{"project": "some_project", "role": "some_role"}]
390 prms_out = [{"project": pid, "role": rid}]
391 uid2, oid = self.topic.new(rollback, self.fake_session, {"username": self.test_name,
392 "password": self.test_name,
393 "project_role_mappings": prms_in
394 })
395 self.assertEqual(len(rollback), 1, "Wrong rollback length")
396 self.assertEqual(uid2, uid1, "Wrong project identifier")
397 content = self.auth.create_user.call_args[0][0]
398 self.assertEqual(content["username"], self.test_name, "Wrong project name")
399 self.assertEqual(content["password"], self.test_name, "Wrong password")
400 self.assertEqual(content["project_role_mappings"], prms_out, "Wrong project-role mappings")
401 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
402 self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time")
403 with self.subTest(i=2):
404 rollback = []
405 def_rid = str(uuid4())
406 def_role = {"_id": def_rid, "name": "project_admin"}
407 self.auth.get_role.return_value = def_role
408 self.auth.get_role_list.return_value = [def_role]
409 prms_out = [{"project": pid, "role": def_rid}]
410 uid2, oid = self.topic.new(rollback, self.fake_session, {"username": self.test_name,
411 "password": self.test_name,
412 "projects": ["some_project"]
413 })
414 self.assertEqual(len(rollback), 1, "Wrong rollback length")
415 self.assertEqual(uid2, uid1, "Wrong project identifier")
416 content = self.auth.create_user.call_args[0][0]
417 self.assertEqual(content["username"], self.test_name, "Wrong project name")
418 self.assertEqual(content["password"], self.test_name, "Wrong password")
419 self.assertEqual(content["project_role_mappings"], prms_out, "Wrong project-role mappings")
420 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
421 self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time")
422 with self.subTest(i=3):
423 rollback = []
424 with self.assertRaises(EngineException, msg="Accepted wrong project-role mappings") as e:
425 self.topic.new(rollback, self.fake_session, {"username": "other-project-name",
426 "password": "other-password",
427 "project_role_mappings": [{}]
428 })
429 self.assertEqual(len(rollback), 0, "Wrong rollback length")
430 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
431 self.assertIn("format error at '{}' '{}'"
432 .format("project_role_mappings:{}", "'{}' is a required property").format(0, "project"),
433 norm(str(e.exception)), "Wrong exception text")
434 with self.subTest(i=4):
435 rollback = []
436 with self.assertRaises(EngineException, msg="Accepted wrong projects") as e:
437 self.topic.new(rollback, self.fake_session, {"username": "other-project-name",
438 "password": "other-password",
439 "projects": []
440 })
441 self.assertEqual(len(rollback), 0, "Wrong rollback length")
442 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
443 self.assertIn("format error at '{}' '{}'" .format("projects", "{} is too short").format([]),
444 norm(str(e.exception)), "Wrong exception text")
445
446 def test_edit_user(self):
447 now = time()
448 uid = str(uuid4())
449 pid1 = str(uuid4())
450 rid1 = str(uuid4())
451 prms = [{"project": pid1, "project_name": "project-1", "role": rid1, "role_name": "role-1"}]
452 user = {"_id": uid, "username": self.test_name, "project_role_mappings": prms,
453 "_admin": {"created": now, "modified": now}}
454 with self.subTest(i=1):
455 self.auth.get_user_list.side_effect = [[user], []]
456 self.auth.get_user.return_value = user
457 pid2 = str(uuid4())
458 rid2 = str(uuid4())
459 self.auth.get_project.side_effect = [{"_id": pid2, "name": "project-2"},
460 {"_id": pid1, "name": "project-1"}]
461 self.auth.get_role.side_effect = [{"_id": rid2, "name": "role-2"},
462 {"_id": rid1, "name": "role-1"}]
463 new_name = "new-user-name"
464 new_pasw = "new-password"
465 add_prms = [{"project": pid2, "role": rid2}]
466 rem_prms = [{"project": pid1, "role": rid1}]
467 self.topic.edit(self.fake_session, uid, {"username": new_name, "password": new_pasw,
468 "add_project_role_mappings": add_prms,
469 "remove_project_role_mappings": rem_prms
470 })
471 content = self.auth.update_user.call_args[0][0]
472 self.assertEqual(content["_id"], uid, "Wrong user identifier")
473 self.assertEqual(content["username"], new_name, "Wrong user name")
474 self.assertEqual(content["password"], new_pasw, "Wrong user password")
475 self.assertEqual(content["add_project_role_mappings"], add_prms, "Wrong project-role mappings to add")
476 self.assertEqual(content["remove_project_role_mappings"], prms, "Wrong project-role mappings to remove")
477 with self.subTest(i=2):
478 new_name = "other-user-name"
479 new_prms = [{}]
480 self.auth.get_role_list.side_effect = [[user], []]
481 with self.assertRaises(EngineException, msg="Accepted wrong project-role mappings") as e:
482 self.topic.edit(self.fake_session, uid, {"username": new_name, "project_role_mappings": new_prms})
483 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
484 self.assertIn("format error at '{}' '{}'"
485 .format("project_role_mappings:{}", "'{}' is a required property").format(0, "project"),
486 norm(str(e.exception)), "Wrong exception text")
487
488 def test_delete_user(self):
489 with self.subTest(i=1):
490 uid = str(uuid4())
491 self.fake_session["username"] = self.test_name
492 user = user = {"_id": uid, "username": "other-user-name", "project_role_mappings": []}
493 self.auth.get_user.return_value = user
494 self.auth.delete_user.return_value = {"deleted": 1}
495 rc = self.topic.delete(self.fake_session, uid)
496 self.assertEqual(rc, {"deleted": 1}, "Wrong user deletion return info")
497 self.assertEqual(self.auth.get_user.call_args[0][0], uid, "Wrong user identifier")
498 self.assertEqual(self.auth.delete_user.call_args[0][0], uid, "Wrong user identifier")
499
500 def test_conflict_on_new(self):
501 with self.subTest(i=1):
502 rollback = []
503 uid = str(uuid4())
504 with self.assertRaises(EngineException, msg="Accepted uuid as username") as e:
505 self.topic.new(rollback, self.fake_session, {"username": uid, "password": self.test_name,
506 "projects": [test_pid]})
507 self.assertEqual(len(rollback), 0, "Wrong rollback length")
508 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
509 self.assertIn("username '{}' cannot have a uuid format".format(uid),
510 norm(str(e.exception)), "Wrong exception text")
511 with self.subTest(i=2):
512 rollback = []
513 self.auth.get_user_list.return_value = [{"_id": str(uuid4()), "username": self.test_name}]
514 with self.assertRaises(EngineException, msg="Accepted existing username") as e:
515 self.topic.new(rollback, self.fake_session, {"username": self.test_name, "password": self.test_name,
516 "projects": [test_pid]})
517 self.assertEqual(len(rollback), 0, "Wrong rollback length")
518 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
519 self.assertIn("username '{}' is already used".format(self.test_name),
520 norm(str(e.exception)), "Wrong exception text")
521 with self.subTest(i=3):
522 rollback = []
523 self.auth.get_user_list.return_value = []
524 self.auth.get_role_list.side_effect = [[], []]
525 with self.assertRaises(AuthconnNotFoundException, msg="Accepted user without default role") as e:
526 self.topic.new(rollback, self.fake_session, {"username": self.test_name, "password": self.test_name,
527 "projects": [str(uuid4())]})
528 self.assertEqual(len(rollback), 0, "Wrong rollback length")
529 self.assertEqual(e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code")
530 self.assertIn("can't find default role for user '{}'".format(self.test_name),
531 norm(str(e.exception)), "Wrong exception text")
532
533 def test_conflict_on_edit(self):
534 uid = str(uuid4())
535 with self.subTest(i=1):
536 self.auth.get_user_list.return_value = [{"_id": uid, "username": self.test_name}]
537 new_name = str(uuid4())
538 with self.assertRaises(EngineException, msg="Accepted uuid as username") as e:
539 self.topic.edit(self.fake_session, uid, {"username": new_name})
540 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
541 self.assertIn("username '{}' cannot have an uuid format".format(new_name),
542 norm(str(e.exception)), "Wrong exception text")
543 with self.subTest(i=2):
544 self.auth.get_user_list.return_value = [{"_id": uid, "username": self.test_name}]
545 self.auth.get_role_list.side_effect = [[], []]
546 with self.assertRaises(AuthconnNotFoundException, msg="Accepted user without default role") as e:
547 self.topic.edit(self.fake_session, uid, {"projects": [str(uuid4())]})
548 self.assertEqual(e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code")
549 self.assertIn("can't find a default role for user '{}'".format(self.test_name),
550 norm(str(e.exception)), "Wrong exception text")
551 with self.subTest(i=3):
552 admin_uid = str(uuid4())
553 self.auth.get_user_list.return_value = [{"_id": admin_uid, "username": "admin"}]
554 with self.assertRaises(EngineException, msg="Accepted removing system_admin role from admin user") as e:
555 self.topic.edit(self.fake_session, admin_uid,
556 {"remove_project_role_mappings": [{"project": "admin", "role": "system_admin"}]})
557 self.assertEqual(e.exception.http_code, HTTPStatus.FORBIDDEN, "Wrong HTTP status code")
558 self.assertIn("you cannot remove system_admin role from admin user",
559 norm(str(e.exception)), "Wrong exception text")
560 with self.subTest(i=4):
561 new_name = "new-user-name"
562 self.auth.get_user_list.side_effect = [[{"_id": uid, "name": self.test_name}],
563 [{"_id": str(uuid4()), "name": new_name}]]
564 with self.assertRaises(EngineException, msg="Accepted existing username") as e:
565 self.topic.edit(self.fake_session, uid, {"username": new_name})
566 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
567 self.assertIn("username '{}' is already used".format(new_name),
568 norm(str(e.exception)), "Wrong exception text")
569
570 def test_conflict_on_del(self):
571 with self.subTest(i=1):
572 uid = str(uuid4())
573 self.fake_session["username"] = self.test_name
574 user = user = {"_id": uid, "username": self.test_name, "project_role_mappings": []}
575 self.auth.get_user.return_value = user
576 with self.assertRaises(EngineException, msg="Accepted deletion of own user") as e:
577 self.topic.delete(self.fake_session, uid)
578 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
579 self.assertIn("you cannot delete your own login user", norm(str(e.exception)), "Wrong exception text")
580
581
582class Test_CommonVimWimSdn(TestCase):
583
584 @classmethod
585 def setUpClass(cls):
586 cls.test_name = "test-cim-topic" # CIM = Common Infrastructure Manager
587
588 def setUp(self):
589 self.db = Mock(dbbase.DbBase())
590 self.fs = Mock(fsbase.FsBase())
591 self.msg = Mock(msgbase.MsgBase())
tierno9e87a7f2020-03-23 09:24:10 +0000592 self.auth = Mock(authconn.Authconn(None, None, None))
delacruzramo79e40f42019-10-10 16:36:40 +0200593 self.topic = CommonVimWimSdn(self.db, self.fs, self.msg, self.auth)
594 # Use WIM schemas for testing because they are the simplest
tiernof5f2e3f2020-03-23 14:42:10 +0000595 self.topic._send_msg = Mock()
delacruzramo79e40f42019-10-10 16:36:40 +0200596 self.topic.topic = "wims"
597 self.topic.schema_new = validation.wim_account_new_schema
598 self.topic.schema_edit = validation.wim_account_edit_schema
599 self.fake_session = {"username": test_name, "project_id": (test_pid,), "method": None,
600 "admin": True, "force": False, "public": False, "allow_show_user_project_role": True}
601
602 def test_new_cvws(self):
603 test_url = "http://0.0.0.0:0"
604 with self.subTest(i=1):
605 rollback = []
606 test_type = "fake"
607 self.db.get_one.return_value = None
608 self.db.create.side_effect = lambda self, content: content["_id"]
609 cid, oid = self.topic.new(rollback, self.fake_session,
610 {"name": self.test_name, "wim_url": test_url, "wim_type": test_type})
611 self.assertEqual(len(rollback), 1, "Wrong rollback length")
612 args = self.db.create.call_args[0]
613 content = args[1]
614 self.assertEqual(args[0], self.topic.topic, "Wrong topic")
615 self.assertEqual(content["_id"], cid, "Wrong CIM identifier")
616 self.assertEqual(content["name"], self.test_name, "Wrong CIM name")
617 self.assertEqual(content["wim_url"], test_url, "Wrong URL")
618 self.assertEqual(content["wim_type"], test_type, "Wrong CIM type")
619 self.assertEqual(content["schema_version"], "1.11", "Wrong schema version")
620 self.assertEqual(content["op_id"], oid, "Wrong operation identifier")
621 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
622 self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time")
623 self.assertEqual(content["_admin"]["operationalState"], "PROCESSING", "Wrong operational state")
624 self.assertEqual(content["_admin"]["projects_read"], [test_pid], "Wrong read-only projects")
625 self.assertEqual(content["_admin"]["projects_write"], [test_pid], "Wrong read/write projects")
626 self.assertIsNone(content["_admin"]["current_operation"], "Wrong current operation")
627 self.assertEqual(len(content["_admin"]["operations"]), 1, "Wrong number of operations")
628 operation = content["_admin"]["operations"][0]
629 self.assertEqual(operation["lcmOperationType"], "create", "Wrong operation type")
630 self.assertEqual(operation["operationState"], "PROCESSING", "Wrong operation state")
631 self.assertGreater(operation["startTime"], content["_admin"]["created"], "Wrong operation start time")
632 self.assertGreater(operation["statusEnteredTime"], content["_admin"]["created"],
633 "Wrong operation status enter time")
634 self.assertEqual(operation["detailed-status"], "", "Wrong operation detailed status info")
635 self.assertIsNone(operation["operationParams"], "Wrong operation parameters")
tiernob3d0a0e2019-11-13 15:57:51 +0000636 # This test is disabled. From Feature 8030 we admit all WIM/SDN types
637 # with self.subTest(i=2):
638 # rollback = []
639 # test_type = "bad_type"
640 # with self.assertRaises(EngineException, msg="Accepted wrong CIM type") as e:
641 # self.topic.new(rollback, self.fake_session,
642 # {"name": self.test_name, "wim_url": test_url, "wim_type": test_type})
643 # self.assertEqual(len(rollback), 0, "Wrong rollback length")
644 # self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
645 # self.assertIn("format error at '{}' '{}".format("wim_type", "'{}' is not one of {}").format(test_type,""),
646 # norm(str(e.exception)), "Wrong exception text")
delacruzramo79e40f42019-10-10 16:36:40 +0200647
648 def test_conflict_on_new(self):
649 with self.subTest(i=1):
650 rollback = []
651 test_url = "http://0.0.0.0:0"
652 test_type = "fake"
653 self.db.get_one.return_value = {"_id": str(uuid4()), "name": self.test_name}
654 with self.assertRaises(EngineException, msg="Accepted existing CIM name") as e:
655 self.topic.new(rollback, self.fake_session,
656 {"name": self.test_name, "wim_url": test_url, "wim_type": test_type})
657 self.assertEqual(len(rollback), 0, "Wrong rollback length")
658 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
659 self.assertIn("name '{}' already exists for {}".format(self.test_name, self.topic.topic),
660 norm(str(e.exception)), "Wrong exception text")
661
662 def test_edit_cvws(self):
663 now = time()
664 cid = str(uuid4())
665 test_url = "http://0.0.0.0:0"
666 test_type = "fake"
667 cvws = {"_id": cid, "name": self.test_name, "wim_url": test_url, "wim_type": test_type,
668 "_admin": {"created": now, "modified": now, "operations": [{"lcmOperationType": "create"}]}}
669 with self.subTest(i=1):
670 new_name = "new-cim-name"
671 new_url = "https://1.1.1.1:1"
672 new_type = "onos"
673 self.db.get_one.side_effect = [cvws, None]
674 self.db.replace.return_value = {"updated": 1}
675 # self.db.encrypt.side_effect = [b64str(), b64str()]
676 self.topic.edit(self.fake_session, cid, {"name": new_name, "wim_url": new_url, "wim_type": new_type})
677 args = self.db.replace.call_args[0]
678 content = args[2]
679 self.assertEqual(args[0], self.topic.topic, "Wrong topic")
680 self.assertEqual(args[1], cid, "Wrong CIM identifier")
681 self.assertEqual(content["_id"], cid, "Wrong CIM identifier")
682 self.assertEqual(content["name"], new_name, "Wrong CIM name")
683 self.assertEqual(content["wim_type"], new_type, "Wrong CIM type")
684 self.assertEqual(content["wim_url"], new_url, "Wrong URL")
685 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
686 self.assertGreater(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time")
687 self.assertEqual(len(content["_admin"]["operations"]), 2, "Wrong number of operations")
688 operation = content["_admin"]["operations"][1]
689 self.assertEqual(operation["lcmOperationType"], "edit", "Wrong operation type")
690 self.assertEqual(operation["operationState"], "PROCESSING", "Wrong operation state")
691 self.assertGreater(operation["startTime"], content["_admin"]["modified"], "Wrong operation start time")
692 self.assertGreater(operation["statusEnteredTime"], content["_admin"]["modified"],
693 "Wrong operation status enter time")
694 self.assertEqual(operation["detailed-status"], "", "Wrong operation detailed status info")
695 self.assertIsNone(operation["operationParams"], "Wrong operation parameters")
696 with self.subTest(i=2):
697 with self.assertRaises(EngineException, msg="Accepted wrong property") as e:
698 self.topic.edit(self.fake_session, str(uuid4()), {"name": "new-name", "extra_prop": "anything"})
699 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
700 self.assertIn("format error '{}'".format("additional properties are not allowed ('{}' was unexpected)").
701 format("extra_prop"),
702 norm(str(e.exception)), "Wrong exception text")
703
704 def test_conflict_on_edit(self):
705 with self.subTest(i=1):
706 cid = str(uuid4())
707 new_name = "new-cim-name"
708 self.db.get_one.side_effect = [{"_id": cid, "name": self.test_name},
709 {"_id": str(uuid4()), "name": new_name}]
710 with self.assertRaises(EngineException, msg="Accepted existing CIM name") as e:
711 self.topic.edit(self.fake_session, cid, {"name": new_name})
712 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
713 self.assertIn("name '{}' already exists for {}".format(new_name, self.topic.topic),
714 norm(str(e.exception)), "Wrong exception text")
715
716 def test_delete_cvws(self):
717 cid = str(uuid4())
718 ro_pid = str(uuid4())
719 rw_pid = str(uuid4())
720 cvws = {"_id": cid, "name": self.test_name}
delacruzramo35c998b2019-11-21 11:09:16 +0100721 self.db.get_list.return_value = []
delacruzramo79e40f42019-10-10 16:36:40 +0200722 with self.subTest(i=1):
723 cvws["_admin"] = {"projects_read": [test_pid, ro_pid, rw_pid], "projects_write": [test_pid, rw_pid]}
724 self.db.get_one.return_value = cvws
725 oid = self.topic.delete(self.fake_session, cid)
726 self.assertIsNone(oid, "Wrong operation identifier")
727 self.assertEqual(self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic")
728 self.assertEqual(self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier")
729 self.assertEqual(self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic")
730 self.assertEqual(self.db.set_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier")
tiernof5f2e3f2020-03-23 14:42:10 +0000731 self.assertEqual(self.db.set_one.call_args[1]["update_dict"], None,
delacruzramo79e40f42019-10-10 16:36:40 +0200732 "Wrong read-only projects update")
tiernof5f2e3f2020-03-23 14:42:10 +0000733 self.assertEqual(self.db.set_one.call_args[1]["pull"], {"_admin.projects_read." + test_pid: None,
734 "_admin.projects_write." + test_pid: None},
delacruzramo79e40f42019-10-10 16:36:40 +0200735 "Wrong read/write projects update")
tiernof5f2e3f2020-03-23 14:42:10 +0000736 self.topic._send_msg.assert_not_called()
delacruzramo35c998b2019-11-21 11:09:16 +0100737 with self.subTest(i=2):
delacruzramo79e40f42019-10-10 16:36:40 +0200738 now = time()
739 cvws["_admin"] = {"projects_read": [test_pid], "projects_write": [test_pid], "operations": []}
740 self.db.get_one.return_value = cvws
741 oid = self.topic.delete(self.fake_session, cid)
742 self.assertEqual(oid, cid+":0", "Wrong operation identifier")
743 self.assertEqual(self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic")
744 self.assertEqual(self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier")
745 self.assertEqual(self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic")
746 self.assertEqual(self.db.set_one.call_args[0][1]["_id"], cid, "Wrong user identifier")
tiernof5f2e3f2020-03-23 14:42:10 +0000747 self.assertEqual(self.db.set_one.call_args[1]["update_dict"], {"_admin.to_delete": True},
748 "Wrong _admin.to_delete update")
delacruzramo79e40f42019-10-10 16:36:40 +0200749 operation = self.db.set_one.call_args[1]["push"]["_admin.operations"]
750 self.assertEqual(operation["lcmOperationType"], "delete", "Wrong operation type")
751 self.assertEqual(operation["operationState"], "PROCESSING", "Wrong operation state")
752 self.assertEqual(operation["detailed-status"], "", "Wrong operation detailed status")
753 self.assertIsNone(operation["operationParams"], "Wrong operation parameters")
754 self.assertGreater(operation["startTime"], now, "Wrong operation start time")
755 self.assertGreater(operation["statusEnteredTime"], now, "Wrong operation status enter time")
tiernof5f2e3f2020-03-23 14:42:10 +0000756 self.topic._send_msg.assert_called_once_with("delete", {"_id": cid, "op_id": cid + ":0"}, not_send_msg=None)
delacruzramo79e40f42019-10-10 16:36:40 +0200757 with self.subTest(i=3):
758 cvws["_admin"] = {"projects_read": [], "projects_write": [], "operations": []}
759 self.db.get_one.return_value = cvws
tiernof5f2e3f2020-03-23 14:42:10 +0000760 self.topic._send_msg.reset_mock()
761 self.db.get_one.reset_mock()
762 self.db.del_one.reset_mock()
delacruzramo79e40f42019-10-10 16:36:40 +0200763 self.fake_session["force"] = True # to force deletion
tiernof5f2e3f2020-03-23 14:42:10 +0000764 self.fake_session["admin"] = True # to force deletion
765 self.fake_session["project_id"] = [] # to force deletion
delacruzramo79e40f42019-10-10 16:36:40 +0200766 oid = self.topic.delete(self.fake_session, cid)
767 self.assertIsNone(oid, "Wrong operation identifier")
768 self.assertEqual(self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic")
769 self.assertEqual(self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier")
770 self.assertEqual(self.db.del_one.call_args[0][0], self.topic.topic, "Wrong topic")
771 self.assertEqual(self.db.del_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier")
tiernof5f2e3f2020-03-23 14:42:10 +0000772 self.topic._send_msg.assert_called_once_with("deleted", {"_id": cid, "op_id": None}, not_send_msg=None)
delacruzramo79e40f42019-10-10 16:36:40 +0200773
774
775if __name__ == '__main__':
776 unittest.main()