Added coverage tests in tox
[osm/NBI.git] / osm_nbi / tests / test_admin_topics.py
1 #! /usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16
17 __author__ = "Pedro de la Cruz Ramos, pedro.delacruzramos@altran.com"
18 __date__ = "$2019-10-019"
19
20 import unittest
21 from unittest import TestCase
22 from unittest.mock import Mock
23 from uuid import uuid4
24 from http import HTTPStatus
25 from time import time
26 from random import randint
27 from osm_common import dbbase, fsbase, msgbase
28 from osm_nbi import authconn, validation
29 from osm_nbi.admin_topics import ProjectTopicAuth, RoleTopicAuth, UserTopicAuth, CommonVimWimSdn
30 from osm_nbi.engine import EngineException
31 from osm_nbi.authconn import AuthconnNotFoundException
32
33
34 test_pid = str(uuid4())
35 test_name = "test-user"
36
37
38 def norm(str):
39 """Normalize string for checking"""
40 return ' '.join(str.strip().split()).lower()
41
42
43 class Test_ProjectTopicAuth(TestCase):
44
45 @classmethod
46 def setUpClass(cls):
47 cls.test_name = "test-project-topic"
48
49 def setUp(self):
50 self.db = Mock(dbbase.DbBase())
51 self.fs = Mock(fsbase.FsBase())
52 self.msg = Mock(msgbase.MsgBase())
53 self.auth = Mock(authconn.Authconn(None, None, None))
54 self.topic = ProjectTopicAuth(self.db, self.fs, self.msg, self.auth)
55 self.fake_session = {"username": self.test_name, "project_id": (test_pid,), "method": None,
56 "admin": True, "force": False, "public": False, "allow_show_user_project_role": True}
57 self.topic.check_quota = Mock(return_value=None) # skip quota
58
59 def test_new_project(self):
60 with self.subTest(i=1):
61 rollback = []
62 pid1 = str(uuid4())
63 self.auth.get_project_list.return_value = []
64 self.auth.create_project.return_value = pid1
65 pid2, oid = self.topic.new(rollback, self.fake_session, {"name": self.test_name, "quotas": {}})
66 self.assertEqual(len(rollback), 1, "Wrong rollback length")
67 self.assertEqual(pid2, pid1, "Wrong project identifier")
68 content = self.auth.create_project.call_args[0][0]
69 self.assertEqual(content["name"], self.test_name, "Wrong project name")
70 self.assertEqual(content["quotas"], {}, "Wrong quotas")
71 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
72 self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time")
73 with self.subTest(i=2):
74 rollback = []
75 with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e:
76 self.topic.new(rollback, self.fake_session, {"name": "other-project-name", "quotas": {"baditems": 10}})
77 self.assertEqual(len(rollback), 0, "Wrong rollback length")
78 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
79 self.assertIn("format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'"
80 .format("baditems"), norm(str(e.exception)), "Wrong exception text")
81
82 def test_edit_project(self):
83 now = time()
84 pid = str(uuid4())
85 proj = {"_id": pid, "name": self.test_name, "_admin": {"created": now, "modified": now}}
86 with self.subTest(i=1):
87 self.auth.get_project_list.side_effect = [[proj], []]
88 new_name = "new-project-name"
89 quotas = {"vnfds": randint(0, 100), "nsds": randint(0, 100)}
90 self.topic.edit(self.fake_session, pid, {"name": new_name, "quotas": quotas})
91 _id, content = self.auth.update_project.call_args[0]
92 self.assertEqual(_id, pid, "Wrong project identifier")
93 self.assertEqual(content["_id"], pid, "Wrong project identifier")
94 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
95 self.assertGreater(content["_admin"]["modified"], now, "Wrong modification time")
96 self.assertEqual(content["name"], new_name, "Wrong project name")
97 self.assertEqual(content["quotas"], quotas, "Wrong quotas")
98 with self.subTest(i=2):
99 new_name = "other-project-name"
100 quotas = {"baditems": randint(0, 100)}
101 self.auth.get_project_list.side_effect = [[proj], []]
102 with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e:
103 self.topic.edit(self.fake_session, pid, {"name": new_name, "quotas": quotas})
104 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
105 self.assertIn("format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'"
106 .format("baditems"), norm(str(e.exception)), "Wrong exception text")
107
108 def test_conflict_on_new(self):
109 with self.subTest(i=1):
110 rollback = []
111 pid = str(uuid4())
112 with self.assertRaises(EngineException, msg="Accepted uuid as project name") as e:
113 self.topic.new(rollback, self.fake_session, {"name": pid})
114 self.assertEqual(len(rollback), 0, "Wrong rollback length")
115 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
116 self.assertIn("project name '{}' cannot have an uuid format".format(pid),
117 norm(str(e.exception)), "Wrong exception text")
118 with self.subTest(i=2):
119 rollback = []
120 self.auth.get_project_list.return_value = [{"_id": test_pid, "name": self.test_name}]
121 with self.assertRaises(EngineException, msg="Accepted existing project name") as e:
122 self.topic.new(rollback, self.fake_session, {"name": self.test_name})
123 self.assertEqual(len(rollback), 0, "Wrong rollback length")
124 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
125 self.assertIn("project '{}' exists".format(self.test_name),
126 norm(str(e.exception)), "Wrong exception text")
127
128 def test_conflict_on_edit(self):
129 with self.subTest(i=1):
130 self.auth.get_project_list.return_value = [{"_id": test_pid, "name": self.test_name}]
131 new_name = str(uuid4())
132 with self.assertRaises(EngineException, msg="Accepted uuid as project name") as e:
133 self.topic.edit(self.fake_session, test_pid, {"name": new_name})
134 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
135 self.assertIn("project name '{}' cannot have an uuid format".format(new_name),
136 norm(str(e.exception)), "Wrong exception text")
137 with self.subTest(i=2):
138 pid = str(uuid4())
139 self.auth.get_project_list.return_value = [{"_id": pid, "name": "admin"}]
140 with self.assertRaises(EngineException, msg="Accepted renaming of project 'admin'") as e:
141 self.topic.edit(self.fake_session, pid, {"name": "new-name"})
142 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
143 self.assertIn("you cannot rename project 'admin'",
144 norm(str(e.exception)), "Wrong exception text")
145 with self.subTest(i=3):
146 new_name = "new-project-name"
147 self.auth.get_project_list.side_effect = [[{"_id": test_pid, "name": self.test_name}],
148 [{"_id": str(uuid4()), "name": new_name}]]
149 with self.assertRaises(EngineException, msg="Accepted existing project name") as e:
150 self.topic.edit(self.fake_session, pid, {"name": new_name})
151 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
152 self.assertIn("project '{}' is already used".format(new_name),
153 norm(str(e.exception)), "Wrong exception text")
154
155 def test_delete_project(self):
156 with self.subTest(i=1):
157 pid = str(uuid4())
158 self.auth.get_project.return_value = {"_id": pid, "name": "other-project-name"}
159 self.auth.delete_project.return_value = {"deleted": 1}
160 self.auth.get_user_list.return_value = []
161 self.db.get_list.return_value = []
162 rc = self.topic.delete(self.fake_session, pid)
163 self.assertEqual(rc, {"deleted": 1}, "Wrong project deletion return info")
164 self.assertEqual(self.auth.get_project.call_args[0][0], pid, "Wrong project identifier")
165 self.assertEqual(self.auth.delete_project.call_args[0][0], pid, "Wrong project identifier")
166
167 def test_conflict_on_del(self):
168 with self.subTest(i=1):
169 self.auth.get_project.return_value = {"_id": test_pid, "name": self.test_name}
170 with self.assertRaises(EngineException, msg="Accepted deletion of own project") as e:
171 self.topic.delete(self.fake_session, self.test_name)
172 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
173 self.assertIn("you cannot delete your own project", norm(str(e.exception)), "Wrong exception text")
174 with self.subTest(i=2):
175 self.auth.get_project.return_value = {"_id": str(uuid4()), "name": "admin"}
176 with self.assertRaises(EngineException, msg="Accepted deletion of project 'admin'") as e:
177 self.topic.delete(self.fake_session, "admin")
178 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
179 self.assertIn("you cannot delete project 'admin'", norm(str(e.exception)), "Wrong exception text")
180 with self.subTest(i=3):
181 pid = str(uuid4())
182 name = "other-project-name"
183 self.auth.get_project.return_value = {"_id": pid, "name": name}
184 self.auth.get_user_list.return_value = [{"_id": str(uuid4()), "username": self.test_name,
185 "project_role_mappings": [{"project": pid, "role": str(uuid4())}]}]
186 with self.assertRaises(EngineException, msg="Accepted deletion of used project") as e:
187 self.topic.delete(self.fake_session, pid)
188 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
189 self.assertIn("project '{}' ({}) is being used by user '{}'".format(name, pid, self.test_name),
190 norm(str(e.exception)), "Wrong exception text")
191 with self.subTest(i=4):
192 self.auth.get_user_list.return_value = []
193 self.db.get_list.return_value = [{"_id": str(uuid4()), "id": self.test_name,
194 "_admin": {"projects_read": [pid], "projects_write": []}}]
195 with self.assertRaises(EngineException, msg="Accepted deletion of used project") as e:
196 self.topic.delete(self.fake_session, pid)
197 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
198 self.assertIn("project '{}' ({}) is being used by {} '{}'"
199 .format(name, pid, "vnf descriptor", self.test_name),
200 norm(str(e.exception)), "Wrong exception text")
201
202
203 class Test_RoleTopicAuth(TestCase):
204
205 @classmethod
206 def setUpClass(cls):
207 cls.test_name = "test-role-topic"
208 cls.test_operations = ["tokens:get"]
209
210 def setUp(self):
211 self.db = Mock(dbbase.DbBase())
212 self.fs = Mock(fsbase.FsBase())
213 self.msg = Mock(msgbase.MsgBase())
214 self.auth = Mock(authconn.Authconn(None, None, None))
215 self.auth.role_permissions = self.test_operations
216 self.topic = RoleTopicAuth(self.db, self.fs, self.msg, self.auth)
217 self.fake_session = {"username": test_name, "project_id": (test_pid,), "method": None,
218 "admin": True, "force": False, "public": False, "allow_show_user_project_role": True}
219 self.topic.check_quota = Mock(return_value=None) # skip quota
220
221 def test_new_role(self):
222 with self.subTest(i=1):
223 rollback = []
224 rid1 = str(uuid4())
225 perms_in = {"tokens": True}
226 perms_out = {"default": False, "admin": False, "tokens": True}
227 self.auth.get_role_list.return_value = []
228 self.auth.create_role.return_value = rid1
229 rid2, oid = self.topic.new(rollback, self.fake_session, {"name": self.test_name, "permissions": perms_in})
230 self.assertEqual(len(rollback), 1, "Wrong rollback length")
231 self.assertEqual(rid2, rid1, "Wrong project identifier")
232 content = self.auth.create_role.call_args[0][0]
233 self.assertEqual(content["name"], self.test_name, "Wrong role name")
234 self.assertEqual(content["permissions"], perms_out, "Wrong permissions")
235 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
236 self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time")
237 with self.subTest(i=2):
238 rollback = []
239 with self.assertRaises(EngineException, msg="Accepted wrong permissions") as e:
240 self.topic.new(rollback, self.fake_session,
241 {"name": "other-role-name", "permissions": {"projects": True}})
242 self.assertEqual(len(rollback), 0, "Wrong rollback length")
243 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
244 self.assertIn("invalid permission '{}'".format("projects"),
245 norm(str(e.exception)), "Wrong exception text")
246
247 def test_edit_role(self):
248 now = time()
249 rid = str(uuid4())
250 role = {"_id": rid, "name": self.test_name, "permissions": {"tokens": True},
251 "_admin": {"created": now, "modified": now}}
252 with self.subTest(i=1):
253 self.auth.get_role_list.side_effect = [[role], []]
254 self.auth.get_role.return_value = role
255 new_name = "new-role-name"
256 perms_in = {"tokens": False, "tokens:get": True}
257 perms_out = {"default": False, "admin": False, "tokens": False, "tokens:get": True}
258 self.topic.edit(self.fake_session, rid, {"name": new_name, "permissions": perms_in})
259 content = self.auth.update_role.call_args[0][0]
260 self.assertEqual(content["_id"], rid, "Wrong role identifier")
261 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
262 self.assertGreater(content["_admin"]["modified"], now, "Wrong modification time")
263 self.assertEqual(content["name"], new_name, "Wrong role name")
264 self.assertEqual(content["permissions"], perms_out, "Wrong permissions")
265 with self.subTest(i=2):
266 new_name = "other-role-name"
267 perms_in = {"tokens": False, "tokens:post": True}
268 self.auth.get_role_list.side_effect = [[role], []]
269 with self.assertRaises(EngineException, msg="Accepted wrong permissions") as e:
270 self.topic.edit(self.fake_session, rid, {"name": new_name, "permissions": perms_in})
271 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
272 self.assertIn("invalid permission '{}'".format("tokens:post"),
273 norm(str(e.exception)), "Wrong exception text")
274
275 def test_delete_role(self):
276 with self.subTest(i=1):
277 rid = str(uuid4())
278 role = {"_id": rid, "name": "other-role-name"}
279 self.auth.get_role_list.return_value = [role]
280 self.auth.get_role.return_value = role
281 self.auth.delete_role.return_value = {"deleted": 1}
282 self.auth.get_user_list.return_value = []
283 rc = self.topic.delete(self.fake_session, rid)
284 self.assertEqual(rc, {"deleted": 1}, "Wrong role deletion return info")
285 self.assertEqual(self.auth.get_role_list.call_args[0][0]["_id"], rid, "Wrong role identifier")
286 self.assertEqual(self.auth.get_role.call_args[0][0], rid, "Wrong role identifier")
287 self.assertEqual(self.auth.delete_role.call_args[0][0], rid, "Wrong role identifier")
288
289 def test_conflict_on_new(self):
290 with self.subTest(i=1):
291 rollback = []
292 rid = str(uuid4())
293 with self.assertRaises(EngineException, msg="Accepted uuid as role name") as e:
294 self.topic.new(rollback, self.fake_session, {"name": rid})
295 self.assertEqual(len(rollback), 0, "Wrong rollback length")
296 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
297 self.assertIn("role name '{}' cannot have an uuid format".format(rid),
298 norm(str(e.exception)), "Wrong exception text")
299 with self.subTest(i=2):
300 rollback = []
301 self.auth.get_role_list.return_value = [{"_id": str(uuid4()), "name": self.test_name}]
302 with self.assertRaises(EngineException, msg="Accepted existing role name") as e:
303 self.topic.new(rollback, self.fake_session, {"name": self.test_name})
304 self.assertEqual(len(rollback), 0, "Wrong rollback length")
305 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
306 self.assertIn("role name '{}' exists".format(self.test_name),
307 norm(str(e.exception)), "Wrong exception text")
308
309 def test_conflict_on_edit(self):
310 rid = str(uuid4())
311 with self.subTest(i=1):
312 self.auth.get_role_list.return_value = [{"_id": rid, "name": self.test_name, "permissions": {}}]
313 new_name = str(uuid4())
314 with self.assertRaises(EngineException, msg="Accepted uuid as role name") as e:
315 self.topic.edit(self.fake_session, rid, {"name": new_name})
316 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
317 self.assertIn("role name '{}' cannot have an uuid format".format(new_name),
318 norm(str(e.exception)), "Wrong exception text")
319 for i, role_name in enumerate(["system_admin", "project_admin"], start=2):
320 with self.subTest(i=i):
321 rid = str(uuid4())
322 self.auth.get_role.return_value = {"_id": rid, "name": role_name, "permissions": {}}
323 with self.assertRaises(EngineException, msg="Accepted renaming of role '{}'".format(role_name)) as e:
324 self.topic.edit(self.fake_session, rid, {"name": "new-name"})
325 self.assertEqual(e.exception.http_code, HTTPStatus.FORBIDDEN, "Wrong HTTP status code")
326 self.assertIn("you cannot rename role '{}'".format(role_name),
327 norm(str(e.exception)), "Wrong exception text")
328 with self.subTest(i=i+1):
329 new_name = "new-role-name"
330 self.auth.get_role_list.side_effect = [[{"_id": rid, "name": self.test_name, "permissions": {}}],
331 [{"_id": str(uuid4()), "name": new_name, "permissions": {}}]]
332 self.auth.get_role.return_value = {"_id": rid, "name": self.test_name, "permissions": {}}
333 with self.assertRaises(EngineException, msg="Accepted existing role name") as e:
334 self.topic.edit(self.fake_session, rid, {"name": new_name})
335 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
336 self.assertIn("role name '{}' exists".format(new_name),
337 norm(str(e.exception)), "Wrong exception text")
338
339 def test_conflict_on_del(self):
340 for i, role_name in enumerate(["system_admin", "project_admin"], start=1):
341 with self.subTest(i=i):
342 rid = str(uuid4())
343 role = {"_id": rid, "name": role_name}
344 self.auth.get_role_list.return_value = [role]
345 self.auth.get_role.return_value = role
346 with self.assertRaises(EngineException, msg="Accepted deletion of role '{}'".format(role_name)) as e:
347 self.topic.delete(self.fake_session, rid)
348 self.assertEqual(e.exception.http_code, HTTPStatus.FORBIDDEN, "Wrong HTTP status code")
349 self.assertIn("you cannot delete role '{}'".format(role_name),
350 norm(str(e.exception)), "Wrong exception text")
351 with self.subTest(i=i+1):
352 rid = str(uuid4())
353 name = "other-role-name"
354 role = {"_id": rid, "name": name}
355 self.auth.get_role_list.return_value = [role]
356 self.auth.get_role.return_value = role
357 self.auth.get_user_list.return_value = [{"_id": str(uuid4()), "username": self.test_name,
358 "project_role_mappings": [{"project": str(uuid4()), "role": rid}]}]
359 with self.assertRaises(EngineException, msg="Accepted deletion of used role") as e:
360 self.topic.delete(self.fake_session, rid)
361 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
362 self.assertIn("role '{}' ({}) is being used by user '{}'".format(name, rid, self.test_name),
363 norm(str(e.exception)), "Wrong exception text")
364
365
366 class Test_UserTopicAuth(TestCase):
367
368 @classmethod
369 def setUpClass(cls):
370 cls.test_name = "test-user-topic"
371
372 def setUp(self):
373 self.db = Mock(dbbase.DbBase())
374 self.fs = Mock(fsbase.FsBase())
375 self.msg = Mock(msgbase.MsgBase())
376 self.auth = Mock(authconn.Authconn(None, None, None))
377 self.topic = UserTopicAuth(self.db, self.fs, self.msg, self.auth)
378 self.fake_session = {"username": test_name, "project_id": (test_pid,), "method": None,
379 "admin": True, "force": False, "public": False, "allow_show_user_project_role": True}
380 self.topic.check_quota = Mock(return_value=None) # skip quota
381
382 def test_new_user(self):
383 uid1 = str(uuid4())
384 pid = str(uuid4())
385 self.auth.get_user_list.return_value = []
386 self.auth.get_project.return_value = {"_id": pid, "name": "some_project"}
387 self.auth.create_user.return_value = {"_id": uid1, "username": self.test_name}
388 with self.subTest(i=1):
389 rollback = []
390 rid = str(uuid4())
391 self.auth.get_role.return_value = {"_id": rid, "name": "some_role"}
392 prms_in = [{"project": "some_project", "role": "some_role"}]
393 prms_out = [{"project": pid, "role": rid}]
394 uid2, oid = self.topic.new(rollback, self.fake_session, {"username": self.test_name,
395 "password": self.test_name,
396 "project_role_mappings": prms_in
397 })
398 self.assertEqual(len(rollback), 1, "Wrong rollback length")
399 self.assertEqual(uid2, uid1, "Wrong project identifier")
400 content = self.auth.create_user.call_args[0][0]
401 self.assertEqual(content["username"], self.test_name, "Wrong project name")
402 self.assertEqual(content["password"], self.test_name, "Wrong password")
403 self.assertEqual(content["project_role_mappings"], prms_out, "Wrong project-role mappings")
404 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
405 self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time")
406 with self.subTest(i=2):
407 rollback = []
408 def_rid = str(uuid4())
409 def_role = {"_id": def_rid, "name": "project_admin"}
410 self.auth.get_role.return_value = def_role
411 self.auth.get_role_list.return_value = [def_role]
412 prms_out = [{"project": pid, "role": def_rid}]
413 uid2, oid = self.topic.new(rollback, self.fake_session, {"username": self.test_name,
414 "password": self.test_name,
415 "projects": ["some_project"]
416 })
417 self.assertEqual(len(rollback), 1, "Wrong rollback length")
418 self.assertEqual(uid2, uid1, "Wrong project identifier")
419 content = self.auth.create_user.call_args[0][0]
420 self.assertEqual(content["username"], self.test_name, "Wrong project name")
421 self.assertEqual(content["password"], self.test_name, "Wrong password")
422 self.assertEqual(content["project_role_mappings"], prms_out, "Wrong project-role mappings")
423 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
424 self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time")
425 with self.subTest(i=3):
426 rollback = []
427 with self.assertRaises(EngineException, msg="Accepted wrong project-role mappings") as e:
428 self.topic.new(rollback, self.fake_session, {"username": "other-project-name",
429 "password": "other-password",
430 "project_role_mappings": [{}]
431 })
432 self.assertEqual(len(rollback), 0, "Wrong rollback length")
433 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
434 self.assertIn("format error at '{}' '{}'"
435 .format("project_role_mappings:{}", "'{}' is a required property").format(0, "project"),
436 norm(str(e.exception)), "Wrong exception text")
437 with self.subTest(i=4):
438 rollback = []
439 with self.assertRaises(EngineException, msg="Accepted wrong projects") as e:
440 self.topic.new(rollback, self.fake_session, {"username": "other-project-name",
441 "password": "other-password",
442 "projects": []
443 })
444 self.assertEqual(len(rollback), 0, "Wrong rollback length")
445 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
446 self.assertIn("format error at '{}' '{}'" .format("projects", "{} is too short").format([]),
447 norm(str(e.exception)), "Wrong exception text")
448
449 def test_edit_user(self):
450 now = time()
451 uid = str(uuid4())
452 pid1 = str(uuid4())
453 rid1 = str(uuid4())
454 prms = [{"project": pid1, "project_name": "project-1", "role": rid1, "role_name": "role-1"}]
455 user = {"_id": uid, "username": self.test_name, "project_role_mappings": prms,
456 "_admin": {"created": now, "modified": now}}
457 with self.subTest(i=1):
458 self.auth.get_user_list.side_effect = [[user], []]
459 self.auth.get_user.return_value = user
460 pid2 = str(uuid4())
461 rid2 = str(uuid4())
462 self.auth.get_project.side_effect = [{"_id": pid2, "name": "project-2"},
463 {"_id": pid1, "name": "project-1"}]
464 self.auth.get_role.side_effect = [{"_id": rid2, "name": "role-2"},
465 {"_id": rid1, "name": "role-1"}]
466 new_name = "new-user-name"
467 new_pasw = "new-password"
468 add_prms = [{"project": pid2, "role": rid2}]
469 rem_prms = [{"project": pid1, "role": rid1}]
470 self.topic.edit(self.fake_session, uid, {"username": new_name, "password": new_pasw,
471 "add_project_role_mappings": add_prms,
472 "remove_project_role_mappings": rem_prms
473 })
474 content = self.auth.update_user.call_args[0][0]
475 self.assertEqual(content["_id"], uid, "Wrong user identifier")
476 self.assertEqual(content["username"], new_name, "Wrong user name")
477 self.assertEqual(content["password"], new_pasw, "Wrong user password")
478 self.assertEqual(content["add_project_role_mappings"], add_prms, "Wrong project-role mappings to add")
479 self.assertEqual(content["remove_project_role_mappings"], prms, "Wrong project-role mappings to remove")
480 with self.subTest(i=2):
481 new_name = "other-user-name"
482 new_prms = [{}]
483 self.auth.get_role_list.side_effect = [[user], []]
484 with self.assertRaises(EngineException, msg="Accepted wrong project-role mappings") as e:
485 self.topic.edit(self.fake_session, uid, {"username": new_name, "project_role_mappings": new_prms})
486 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
487 self.assertIn("format error at '{}' '{}'"
488 .format("project_role_mappings:{}", "'{}' is a required property").format(0, "project"),
489 norm(str(e.exception)), "Wrong exception text")
490
491 def test_delete_user(self):
492 with self.subTest(i=1):
493 uid = str(uuid4())
494 self.fake_session["username"] = self.test_name
495 user = user = {"_id": uid, "username": "other-user-name", "project_role_mappings": []}
496 self.auth.get_user.return_value = user
497 self.auth.delete_user.return_value = {"deleted": 1}
498 rc = self.topic.delete(self.fake_session, uid)
499 self.assertEqual(rc, {"deleted": 1}, "Wrong user deletion return info")
500 self.assertEqual(self.auth.get_user.call_args[0][0], uid, "Wrong user identifier")
501 self.assertEqual(self.auth.delete_user.call_args[0][0], uid, "Wrong user identifier")
502
503 def test_conflict_on_new(self):
504 with self.subTest(i=1):
505 rollback = []
506 uid = str(uuid4())
507 with self.assertRaises(EngineException, msg="Accepted uuid as username") as e:
508 self.topic.new(rollback, self.fake_session, {"username": uid, "password": self.test_name,
509 "projects": [test_pid]})
510 self.assertEqual(len(rollback), 0, "Wrong rollback length")
511 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
512 self.assertIn("username '{}' cannot have a uuid format".format(uid),
513 norm(str(e.exception)), "Wrong exception text")
514 with self.subTest(i=2):
515 rollback = []
516 self.auth.get_user_list.return_value = [{"_id": str(uuid4()), "username": self.test_name}]
517 with self.assertRaises(EngineException, msg="Accepted existing username") as e:
518 self.topic.new(rollback, self.fake_session, {"username": self.test_name, "password": self.test_name,
519 "projects": [test_pid]})
520 self.assertEqual(len(rollback), 0, "Wrong rollback length")
521 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
522 self.assertIn("username '{}' is already used".format(self.test_name),
523 norm(str(e.exception)), "Wrong exception text")
524 with self.subTest(i=3):
525 rollback = []
526 self.auth.get_user_list.return_value = []
527 self.auth.get_role_list.side_effect = [[], []]
528 with self.assertRaises(AuthconnNotFoundException, msg="Accepted user without default role") as e:
529 self.topic.new(rollback, self.fake_session, {"username": self.test_name, "password": self.test_name,
530 "projects": [str(uuid4())]})
531 self.assertEqual(len(rollback), 0, "Wrong rollback length")
532 self.assertEqual(e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code")
533 self.assertIn("can't find default role for user '{}'".format(self.test_name),
534 norm(str(e.exception)), "Wrong exception text")
535
536 def test_conflict_on_edit(self):
537 uid = str(uuid4())
538 with self.subTest(i=1):
539 self.auth.get_user_list.return_value = [{"_id": uid, "username": self.test_name}]
540 new_name = str(uuid4())
541 with self.assertRaises(EngineException, msg="Accepted uuid as username") as e:
542 self.topic.edit(self.fake_session, uid, {"username": new_name})
543 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
544 self.assertIn("username '{}' cannot have an uuid format".format(new_name),
545 norm(str(e.exception)), "Wrong exception text")
546 with self.subTest(i=2):
547 self.auth.get_user_list.return_value = [{"_id": uid, "username": self.test_name}]
548 self.auth.get_role_list.side_effect = [[], []]
549 with self.assertRaises(AuthconnNotFoundException, msg="Accepted user without default role") as e:
550 self.topic.edit(self.fake_session, uid, {"projects": [str(uuid4())]})
551 self.assertEqual(e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code")
552 self.assertIn("can't find a default role for user '{}'".format(self.test_name),
553 norm(str(e.exception)), "Wrong exception text")
554 with self.subTest(i=3):
555 admin_uid = str(uuid4())
556 self.auth.get_user_list.return_value = [{"_id": admin_uid, "username": "admin"}]
557 with self.assertRaises(EngineException, msg="Accepted removing system_admin role from admin user") as e:
558 self.topic.edit(self.fake_session, admin_uid,
559 {"remove_project_role_mappings": [{"project": "admin", "role": "system_admin"}]})
560 self.assertEqual(e.exception.http_code, HTTPStatus.FORBIDDEN, "Wrong HTTP status code")
561 self.assertIn("you cannot remove system_admin role from admin user",
562 norm(str(e.exception)), "Wrong exception text")
563 with self.subTest(i=4):
564 new_name = "new-user-name"
565 self.auth.get_user_list.side_effect = [[{"_id": uid, "name": self.test_name}],
566 [{"_id": str(uuid4()), "name": new_name}]]
567 with self.assertRaises(EngineException, msg="Accepted existing username") as e:
568 self.topic.edit(self.fake_session, uid, {"username": new_name})
569 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
570 self.assertIn("username '{}' is already used".format(new_name),
571 norm(str(e.exception)), "Wrong exception text")
572
573 def test_conflict_on_del(self):
574 with self.subTest(i=1):
575 uid = str(uuid4())
576 self.fake_session["username"] = self.test_name
577 user = user = {"_id": uid, "username": self.test_name, "project_role_mappings": []}
578 self.auth.get_user.return_value = user
579 with self.assertRaises(EngineException, msg="Accepted deletion of own user") as e:
580 self.topic.delete(self.fake_session, uid)
581 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
582 self.assertIn("you cannot delete your own login user", norm(str(e.exception)), "Wrong exception text")
583
584
585 class Test_CommonVimWimSdn(TestCase):
586
587 @classmethod
588 def setUpClass(cls):
589 cls.test_name = "test-cim-topic" # CIM = Common Infrastructure Manager
590
591 def setUp(self):
592 self.db = Mock(dbbase.DbBase())
593 self.fs = Mock(fsbase.FsBase())
594 self.msg = Mock(msgbase.MsgBase())
595 self.auth = Mock(authconn.Authconn(None, None, None))
596 self.topic = CommonVimWimSdn(self.db, self.fs, self.msg, self.auth)
597 # Use WIM schemas for testing because they are the simplest
598 self.topic._send_msg = Mock()
599 self.topic.topic = "wims"
600 self.topic.schema_new = validation.wim_account_new_schema
601 self.topic.schema_edit = validation.wim_account_edit_schema
602 self.fake_session = {"username": test_name, "project_id": (test_pid,), "method": None,
603 "admin": True, "force": False, "public": False, "allow_show_user_project_role": True}
604 self.topic.check_quota = Mock(return_value=None) # skip quota
605
606 def test_new_cvws(self):
607 test_url = "http://0.0.0.0:0"
608 with self.subTest(i=1):
609 rollback = []
610 test_type = "fake"
611 self.db.get_one.return_value = None
612 self.db.create.side_effect = lambda self, content: content["_id"]
613 cid, oid = self.topic.new(rollback, self.fake_session,
614 {"name": self.test_name, "wim_url": test_url, "wim_type": test_type})
615 self.assertEqual(len(rollback), 1, "Wrong rollback length")
616 args = self.db.create.call_args[0]
617 content = args[1]
618 self.assertEqual(args[0], self.topic.topic, "Wrong topic")
619 self.assertEqual(content["_id"], cid, "Wrong CIM identifier")
620 self.assertEqual(content["name"], self.test_name, "Wrong CIM name")
621 self.assertEqual(content["wim_url"], test_url, "Wrong URL")
622 self.assertEqual(content["wim_type"], test_type, "Wrong CIM type")
623 self.assertEqual(content["schema_version"], "1.11", "Wrong schema version")
624 self.assertEqual(content["op_id"], oid, "Wrong operation identifier")
625 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
626 self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time")
627 self.assertEqual(content["_admin"]["operationalState"], "PROCESSING", "Wrong operational state")
628 self.assertEqual(content["_admin"]["projects_read"], [test_pid], "Wrong read-only projects")
629 self.assertEqual(content["_admin"]["projects_write"], [test_pid], "Wrong read/write projects")
630 self.assertIsNone(content["_admin"]["current_operation"], "Wrong current operation")
631 self.assertEqual(len(content["_admin"]["operations"]), 1, "Wrong number of operations")
632 operation = content["_admin"]["operations"][0]
633 self.assertEqual(operation["lcmOperationType"], "create", "Wrong operation type")
634 self.assertEqual(operation["operationState"], "PROCESSING", "Wrong operation state")
635 self.assertGreater(operation["startTime"], content["_admin"]["created"], "Wrong operation start time")
636 self.assertGreater(operation["statusEnteredTime"], content["_admin"]["created"],
637 "Wrong operation status enter time")
638 self.assertEqual(operation["detailed-status"], "", "Wrong operation detailed status info")
639 self.assertIsNone(operation["operationParams"], "Wrong operation parameters")
640 # This test is disabled. From Feature 8030 we admit all WIM/SDN types
641 # with self.subTest(i=2):
642 # rollback = []
643 # test_type = "bad_type"
644 # with self.assertRaises(EngineException, msg="Accepted wrong CIM type") as e:
645 # self.topic.new(rollback, self.fake_session,
646 # {"name": self.test_name, "wim_url": test_url, "wim_type": test_type})
647 # self.assertEqual(len(rollback), 0, "Wrong rollback length")
648 # self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
649 # self.assertIn("format error at '{}' '{}".format("wim_type", "'{}' is not one of {}").format(test_type,""),
650 # norm(str(e.exception)), "Wrong exception text")
651
652 def test_conflict_on_new(self):
653 with self.subTest(i=1):
654 rollback = []
655 test_url = "http://0.0.0.0:0"
656 test_type = "fake"
657 self.db.get_one.return_value = {"_id": str(uuid4()), "name": self.test_name}
658 with self.assertRaises(EngineException, msg="Accepted existing CIM name") as e:
659 self.topic.new(rollback, self.fake_session,
660 {"name": self.test_name, "wim_url": test_url, "wim_type": test_type})
661 self.assertEqual(len(rollback), 0, "Wrong rollback length")
662 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
663 self.assertIn("name '{}' already exists for {}".format(self.test_name, self.topic.topic),
664 norm(str(e.exception)), "Wrong exception text")
665
666 def test_edit_cvws(self):
667 now = time()
668 cid = str(uuid4())
669 test_url = "http://0.0.0.0:0"
670 test_type = "fake"
671 cvws = {"_id": cid, "name": self.test_name, "wim_url": test_url, "wim_type": test_type,
672 "_admin": {"created": now, "modified": now, "operations": [{"lcmOperationType": "create"}]}}
673 with self.subTest(i=1):
674 new_name = "new-cim-name"
675 new_url = "https://1.1.1.1:1"
676 new_type = "onos"
677 self.db.get_one.side_effect = [cvws, None]
678 self.db.replace.return_value = {"updated": 1}
679 # self.db.encrypt.side_effect = [b64str(), b64str()]
680 self.topic.edit(self.fake_session, cid, {"name": new_name, "wim_url": new_url, "wim_type": new_type})
681 args = self.db.replace.call_args[0]
682 content = args[2]
683 self.assertEqual(args[0], self.topic.topic, "Wrong topic")
684 self.assertEqual(args[1], cid, "Wrong CIM identifier")
685 self.assertEqual(content["_id"], cid, "Wrong CIM identifier")
686 self.assertEqual(content["name"], new_name, "Wrong CIM name")
687 self.assertEqual(content["wim_type"], new_type, "Wrong CIM type")
688 self.assertEqual(content["wim_url"], new_url, "Wrong URL")
689 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
690 self.assertGreater(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time")
691 self.assertEqual(len(content["_admin"]["operations"]), 2, "Wrong number of operations")
692 operation = content["_admin"]["operations"][1]
693 self.assertEqual(operation["lcmOperationType"], "edit", "Wrong operation type")
694 self.assertEqual(operation["operationState"], "PROCESSING", "Wrong operation state")
695 self.assertGreater(operation["startTime"], content["_admin"]["modified"], "Wrong operation start time")
696 self.assertGreater(operation["statusEnteredTime"], content["_admin"]["modified"],
697 "Wrong operation status enter time")
698 self.assertEqual(operation["detailed-status"], "", "Wrong operation detailed status info")
699 self.assertIsNone(operation["operationParams"], "Wrong operation parameters")
700 with self.subTest(i=2):
701 with self.assertRaises(EngineException, msg="Accepted wrong property") as e:
702 self.topic.edit(self.fake_session, str(uuid4()), {"name": "new-name", "extra_prop": "anything"})
703 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
704 self.assertIn("format error '{}'".format("additional properties are not allowed ('{}' was unexpected)").
705 format("extra_prop"),
706 norm(str(e.exception)), "Wrong exception text")
707
708 def test_conflict_on_edit(self):
709 with self.subTest(i=1):
710 cid = str(uuid4())
711 new_name = "new-cim-name"
712 self.db.get_one.side_effect = [{"_id": cid, "name": self.test_name},
713 {"_id": str(uuid4()), "name": new_name}]
714 with self.assertRaises(EngineException, msg="Accepted existing CIM name") as e:
715 self.topic.edit(self.fake_session, cid, {"name": new_name})
716 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
717 self.assertIn("name '{}' already exists for {}".format(new_name, self.topic.topic),
718 norm(str(e.exception)), "Wrong exception text")
719
720 def test_delete_cvws(self):
721 cid = str(uuid4())
722 ro_pid = str(uuid4())
723 rw_pid = str(uuid4())
724 cvws = {"_id": cid, "name": self.test_name}
725 self.db.get_list.return_value = []
726 with self.subTest(i=1):
727 cvws["_admin"] = {"projects_read": [test_pid, ro_pid, rw_pid], "projects_write": [test_pid, rw_pid]}
728 self.db.get_one.return_value = cvws
729 oid = self.topic.delete(self.fake_session, cid)
730 self.assertIsNone(oid, "Wrong operation identifier")
731 self.assertEqual(self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic")
732 self.assertEqual(self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier")
733 self.assertEqual(self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic")
734 self.assertEqual(self.db.set_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier")
735 self.assertEqual(self.db.set_one.call_args[1]["update_dict"], None,
736 "Wrong read-only projects update")
737 self.assertEqual(self.db.set_one.call_args[1]["pull_list"],
738 {"_admin.projects_read": (test_pid,), "_admin.projects_write": (test_pid,)},
739 "Wrong read/write projects update")
740 self.topic._send_msg.assert_not_called()
741 with self.subTest(i=2):
742 now = time()
743 cvws["_admin"] = {"projects_read": [test_pid], "projects_write": [test_pid], "operations": []}
744 self.db.get_one.return_value = cvws
745 oid = self.topic.delete(self.fake_session, cid)
746 self.assertEqual(oid, cid+":0", "Wrong operation identifier")
747 self.assertEqual(self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic")
748 self.assertEqual(self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier")
749 self.assertEqual(self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic")
750 self.assertEqual(self.db.set_one.call_args[0][1]["_id"], cid, "Wrong user identifier")
751 self.assertEqual(self.db.set_one.call_args[1]["update_dict"], {"_admin.to_delete": True},
752 "Wrong _admin.to_delete update")
753 operation = self.db.set_one.call_args[1]["push"]["_admin.operations"]
754 self.assertEqual(operation["lcmOperationType"], "delete", "Wrong operation type")
755 self.assertEqual(operation["operationState"], "PROCESSING", "Wrong operation state")
756 self.assertEqual(operation["detailed-status"], "", "Wrong operation detailed status")
757 self.assertIsNone(operation["operationParams"], "Wrong operation parameters")
758 self.assertGreater(operation["startTime"], now, "Wrong operation start time")
759 self.assertGreater(operation["statusEnteredTime"], now, "Wrong operation status enter time")
760 self.topic._send_msg.assert_called_once_with("delete", {"_id": cid, "op_id": cid + ":0"}, not_send_msg=None)
761 with self.subTest(i=3):
762 cvws["_admin"] = {"projects_read": [], "projects_write": [], "operations": []}
763 self.db.get_one.return_value = cvws
764 self.topic._send_msg.reset_mock()
765 self.db.get_one.reset_mock()
766 self.db.del_one.reset_mock()
767 self.fake_session["force"] = True # to force deletion
768 self.fake_session["admin"] = True # to force deletion
769 self.fake_session["project_id"] = [] # to force deletion
770 oid = self.topic.delete(self.fake_session, cid)
771 self.assertIsNone(oid, "Wrong operation identifier")
772 self.assertEqual(self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic")
773 self.assertEqual(self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier")
774 self.assertEqual(self.db.del_one.call_args[0][0], self.topic.topic, "Wrong topic")
775 self.assertEqual(self.db.del_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier")
776 self.topic._send_msg.assert_called_once_with("deleted", {"_id": cid, "op_id": None}, not_send_msg=None)
777
778
779 if __name__ == '__main__':
780 unittest.main()