Feature 7184: Adding flavor, image info to nsr
[osm/NBI.git] / osm_nbi / tests / test_admin_topics.py
1 #! /usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16
17 __author__ = "Pedro de la Cruz Ramos, pedro.delacruzramos@altran.com"
18 __date__ = "$2019-10-019"
19
20 import unittest
21 from unittest import TestCase
22 from unittest.mock import Mock
23 from uuid import uuid4
24 from http import HTTPStatus
25 from time import time
26 from random import randint
27 from osm_common import dbbase, fsbase, msgbase
28 from osm_nbi import authconn, validation
29 from osm_nbi.admin_topics import ProjectTopicAuth, RoleTopicAuth, UserTopicAuth, CommonVimWimSdn
30 from osm_nbi.engine import EngineException
31 from osm_nbi.authconn import AuthconnNotFoundException
32
33
34 test_pid = str(uuid4())
35 test_name = "test-user"
36
37
38 def norm(str):
39 """Normalize string for checking"""
40 return ' '.join(str.strip().split()).lower()
41
42
43 class Test_ProjectTopicAuth(TestCase):
44
45 @classmethod
46 def setUpClass(cls):
47 cls.test_name = "test-project-topic"
48
49 def setUp(self):
50 self.db = Mock(dbbase.DbBase())
51 self.fs = Mock(fsbase.FsBase())
52 self.msg = Mock(msgbase.MsgBase())
53 self.auth = Mock(authconn.Authconn(None, None, None))
54 self.topic = ProjectTopicAuth(self.db, self.fs, self.msg, self.auth)
55 self.fake_session = {"username": self.test_name, "project_id": (test_pid,), "method": None,
56 "admin": True, "force": False, "public": False, "allow_show_user_project_role": True}
57 self.topic.check_quota = Mock(return_value=None) # skip quota
58
59 def test_new_project(self):
60 with self.subTest(i=1):
61 rollback = []
62 pid1 = str(uuid4())
63 self.auth.get_project_list.return_value = []
64 self.auth.create_project.return_value = pid1
65 pid2, oid = self.topic.new(rollback, self.fake_session, {"name": self.test_name, "quotas": {}})
66 self.assertEqual(len(rollback), 1, "Wrong rollback length")
67 self.assertEqual(pid2, pid1, "Wrong project identifier")
68 content = self.auth.create_project.call_args[0][0]
69 self.assertEqual(content["name"], self.test_name, "Wrong project name")
70 self.assertEqual(content["quotas"], {}, "Wrong quotas")
71 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
72 self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time")
73 with self.subTest(i=2):
74 rollback = []
75 with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e:
76 self.topic.new(rollback, self.fake_session, {"name": "other-project-name", "quotas": {"baditems": 10}})
77 self.assertEqual(len(rollback), 0, "Wrong rollback length")
78 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
79 self.assertIn("format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'"
80 .format("baditems"), norm(str(e.exception)), "Wrong exception text")
81
82 def test_edit_project(self):
83 now = time()
84 pid = str(uuid4())
85 proj = {"_id": pid, "name": self.test_name, "_admin": {"created": now, "modified": now}}
86 with self.subTest(i=1):
87 self.auth.get_project_list.side_effect = [[proj], []]
88 new_name = "new-project-name"
89 quotas = {"vnfds": randint(0, 100), "nsds": randint(0, 100)}
90 self.topic.edit(self.fake_session, pid, {"name": new_name, "quotas": quotas})
91 _id, content = self.auth.update_project.call_args[0]
92 self.assertEqual(_id, pid, "Wrong project identifier")
93 self.assertEqual(content["_id"], pid, "Wrong project identifier")
94 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
95 self.assertGreater(content["_admin"]["modified"], now, "Wrong modification time")
96 self.assertEqual(content["name"], new_name, "Wrong project name")
97 self.assertEqual(content["quotas"], quotas, "Wrong quotas")
98 with self.subTest(i=2):
99 new_name = "other-project-name"
100 quotas = {"baditems": randint(0, 100)}
101 self.auth.get_project_list.side_effect = [[proj], []]
102 with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e:
103 self.topic.edit(self.fake_session, pid, {"name": new_name, "quotas": quotas})
104 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
105 self.assertIn("format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'"
106 .format("baditems"), norm(str(e.exception)), "Wrong exception text")
107
108 def test_conflict_on_new(self):
109 with self.subTest(i=1):
110 rollback = []
111 pid = str(uuid4())
112 with self.assertRaises(EngineException, msg="Accepted uuid as project name") as e:
113 self.topic.new(rollback, self.fake_session, {"name": pid})
114 self.assertEqual(len(rollback), 0, "Wrong rollback length")
115 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
116 self.assertIn("project name '{}' cannot have an uuid format".format(pid),
117 norm(str(e.exception)), "Wrong exception text")
118 with self.subTest(i=2):
119 rollback = []
120 self.auth.get_project_list.return_value = [{"_id": test_pid, "name": self.test_name}]
121 with self.assertRaises(EngineException, msg="Accepted existing project name") as e:
122 self.topic.new(rollback, self.fake_session, {"name": self.test_name})
123 self.assertEqual(len(rollback), 0, "Wrong rollback length")
124 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
125 self.assertIn("project '{}' exists".format(self.test_name),
126 norm(str(e.exception)), "Wrong exception text")
127
128 def test_conflict_on_edit(self):
129 with self.subTest(i=1):
130 self.auth.get_project_list.return_value = [{"_id": test_pid, "name": self.test_name}]
131 new_name = str(uuid4())
132 with self.assertRaises(EngineException, msg="Accepted uuid as project name") as e:
133 self.topic.edit(self.fake_session, test_pid, {"name": new_name})
134 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
135 self.assertIn("project name '{}' cannot have an uuid format".format(new_name),
136 norm(str(e.exception)), "Wrong exception text")
137 with self.subTest(i=2):
138 pid = str(uuid4())
139 self.auth.get_project_list.return_value = [{"_id": pid, "name": "admin"}]
140 with self.assertRaises(EngineException, msg="Accepted renaming of project 'admin'") as e:
141 self.topic.edit(self.fake_session, pid, {"name": "new-name"})
142 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
143 self.assertIn("you cannot rename project 'admin'",
144 norm(str(e.exception)), "Wrong exception text")
145 with self.subTest(i=3):
146 new_name = "new-project-name"
147 self.auth.get_project_list.side_effect = [[{"_id": test_pid, "name": self.test_name}],
148 [{"_id": str(uuid4()), "name": new_name}]]
149 with self.assertRaises(EngineException, msg="Accepted existing project name") as e:
150 self.topic.edit(self.fake_session, pid, {"name": new_name})
151 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
152 self.assertIn("project '{}' is already used".format(new_name),
153 norm(str(e.exception)), "Wrong exception text")
154
155 def test_delete_project(self):
156 with self.subTest(i=1):
157 pid = str(uuid4())
158 self.auth.get_project.return_value = {"_id": pid, "name": "other-project-name"}
159 self.auth.delete_project.return_value = {"deleted": 1}
160 self.auth.get_user_list.return_value = []
161 self.db.get_list.return_value = []
162 rc = self.topic.delete(self.fake_session, pid)
163 self.assertEqual(rc, {"deleted": 1}, "Wrong project deletion return info")
164 self.assertEqual(self.auth.get_project.call_args[0][0], pid, "Wrong project identifier")
165 self.assertEqual(self.auth.delete_project.call_args[0][0], pid, "Wrong project identifier")
166
167 def test_conflict_on_del(self):
168 with self.subTest(i=1):
169 self.auth.get_project.return_value = {"_id": test_pid, "name": self.test_name}
170 with self.assertRaises(EngineException, msg="Accepted deletion of own project") as e:
171 self.topic.delete(self.fake_session, self.test_name)
172 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
173 self.assertIn("you cannot delete your own project", norm(str(e.exception)), "Wrong exception text")
174 with self.subTest(i=2):
175 self.auth.get_project.return_value = {"_id": str(uuid4()), "name": "admin"}
176 with self.assertRaises(EngineException, msg="Accepted deletion of project 'admin'") as e:
177 self.topic.delete(self.fake_session, "admin")
178 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
179 self.assertIn("you cannot delete project 'admin'", norm(str(e.exception)), "Wrong exception text")
180 with self.subTest(i=3):
181 pid = str(uuid4())
182 name = "other-project-name"
183 self.auth.get_project.return_value = {"_id": pid, "name": name}
184 self.auth.get_user_list.return_value = [{"_id": str(uuid4()), "username": self.test_name,
185 "project_role_mappings": [{"project": pid, "role": str(uuid4())}]}]
186 with self.assertRaises(EngineException, msg="Accepted deletion of used project") as e:
187 self.topic.delete(self.fake_session, pid)
188 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
189 self.assertIn("project '{}' ({}) is being used by user '{}'".format(name, pid, self.test_name),
190 norm(str(e.exception)), "Wrong exception text")
191 with self.subTest(i=4):
192 self.auth.get_user_list.return_value = []
193 self.db.get_list.return_value = [{"_id": str(uuid4()), "id": self.test_name,
194 "_admin": {"projects_read": [pid], "projects_write": []}}]
195 with self.assertRaises(EngineException, msg="Accepted deletion of used project") as e:
196 self.topic.delete(self.fake_session, pid)
197 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
198 self.assertIn("project '{}' ({}) is being used by {} '{}'"
199 .format(name, pid, "vnf descriptor", self.test_name),
200 norm(str(e.exception)), "Wrong exception text")
201
202
203 class Test_RoleTopicAuth(TestCase):
204
205 @classmethod
206 def setUpClass(cls):
207 cls.test_name = "test-role-topic"
208 cls.test_operations = ["tokens:get"]
209
210 def setUp(self):
211 self.db = Mock(dbbase.DbBase())
212 self.fs = Mock(fsbase.FsBase())
213 self.msg = Mock(msgbase.MsgBase())
214 self.auth = Mock(authconn.Authconn(None, None, None))
215 self.auth.role_permissions = self.test_operations
216 self.topic = RoleTopicAuth(self.db, self.fs, self.msg, self.auth)
217 self.fake_session = {"username": test_name, "project_id": (test_pid,), "method": None,
218 "admin": True, "force": False, "public": False, "allow_show_user_project_role": True}
219 self.topic.check_quota = Mock(return_value=None) # skip quota
220
221 def test_new_role(self):
222 with self.subTest(i=1):
223 rollback = []
224 rid1 = str(uuid4())
225 perms_in = {"tokens": True}
226 perms_out = {"default": False, "admin": False, "tokens": True}
227 self.auth.get_role_list.return_value = []
228 self.auth.create_role.return_value = rid1
229 rid2, oid = self.topic.new(rollback, self.fake_session, {"name": self.test_name, "permissions": perms_in})
230 self.assertEqual(len(rollback), 1, "Wrong rollback length")
231 self.assertEqual(rid2, rid1, "Wrong project identifier")
232 content = self.auth.create_role.call_args[0][0]
233 self.assertEqual(content["name"], self.test_name, "Wrong role name")
234 self.assertEqual(content["permissions"], perms_out, "Wrong permissions")
235 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
236 self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time")
237 with self.subTest(i=2):
238 rollback = []
239 with self.assertRaises(EngineException, msg="Accepted wrong permissions") as e:
240 self.topic.new(rollback, self.fake_session,
241 {"name": "other-role-name", "permissions": {"projects": True}})
242 self.assertEqual(len(rollback), 0, "Wrong rollback length")
243 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
244 self.assertIn("invalid permission '{}'".format("projects"),
245 norm(str(e.exception)), "Wrong exception text")
246
247 def test_edit_role(self):
248 now = time()
249 rid = str(uuid4())
250 role = {"_id": rid, "name": self.test_name, "permissions": {"tokens": True},
251 "_admin": {"created": now, "modified": now}}
252 with self.subTest(i=1):
253 self.auth.get_role_list.side_effect = [[role], []]
254 self.auth.get_role.return_value = role
255 new_name = "new-role-name"
256 perms_in = {"tokens": False, "tokens:get": True}
257 perms_out = {"default": False, "admin": False, "tokens": False, "tokens:get": True}
258 self.topic.edit(self.fake_session, rid, {"name": new_name, "permissions": perms_in})
259 content = self.auth.update_role.call_args[0][0]
260 self.assertEqual(content["_id"], rid, "Wrong role identifier")
261 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
262 self.assertGreater(content["_admin"]["modified"], now, "Wrong modification time")
263 self.assertEqual(content["name"], new_name, "Wrong role name")
264 self.assertEqual(content["permissions"], perms_out, "Wrong permissions")
265 with self.subTest(i=2):
266 new_name = "other-role-name"
267 perms_in = {"tokens": False, "tokens:post": True}
268 self.auth.get_role_list.side_effect = [[role], []]
269 with self.assertRaises(EngineException, msg="Accepted wrong permissions") as e:
270 self.topic.edit(self.fake_session, rid, {"name": new_name, "permissions": perms_in})
271 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
272 self.assertIn("invalid permission '{}'".format("tokens:post"),
273 norm(str(e.exception)), "Wrong exception text")
274
275 def test_delete_role(self):
276 with self.subTest(i=1):
277 rid = str(uuid4())
278 role = {"_id": rid, "name": "other-role-name"}
279 self.auth.get_role_list.return_value = [role]
280 self.auth.get_role.return_value = role
281 self.auth.delete_role.return_value = {"deleted": 1}
282 self.auth.get_user_list.return_value = []
283 rc = self.topic.delete(self.fake_session, rid)
284 self.assertEqual(rc, {"deleted": 1}, "Wrong role deletion return info")
285 self.assertEqual(self.auth.get_role_list.call_args[0][0]["_id"], rid, "Wrong role identifier")
286 self.assertEqual(self.auth.get_role.call_args[0][0], rid, "Wrong role identifier")
287 self.assertEqual(self.auth.delete_role.call_args[0][0], rid, "Wrong role identifier")
288
289 def test_conflict_on_new(self):
290 with self.subTest(i=1):
291 rollback = []
292 rid = str(uuid4())
293 with self.assertRaises(EngineException, msg="Accepted uuid as role name") as e:
294 self.topic.new(rollback, self.fake_session, {"name": rid})
295 self.assertEqual(len(rollback), 0, "Wrong rollback length")
296 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
297 self.assertIn("role name '{}' cannot have an uuid format".format(rid),
298 norm(str(e.exception)), "Wrong exception text")
299 with self.subTest(i=2):
300 rollback = []
301 self.auth.get_role_list.return_value = [{"_id": str(uuid4()), "name": self.test_name}]
302 with self.assertRaises(EngineException, msg="Accepted existing role name") as e:
303 self.topic.new(rollback, self.fake_session, {"name": self.test_name})
304 self.assertEqual(len(rollback), 0, "Wrong rollback length")
305 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
306 self.assertIn("role name '{}' exists".format(self.test_name),
307 norm(str(e.exception)), "Wrong exception text")
308
309 def test_conflict_on_edit(self):
310 rid = str(uuid4())
311 with self.subTest(i=1):
312 self.auth.get_role_list.return_value = [{"_id": rid, "name": self.test_name, "permissions": {}}]
313 new_name = str(uuid4())
314 with self.assertRaises(EngineException, msg="Accepted uuid as role name") as e:
315 self.topic.edit(self.fake_session, rid, {"name": new_name})
316 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
317 self.assertIn("role name '{}' cannot have an uuid format".format(new_name),
318 norm(str(e.exception)), "Wrong exception text")
319 for i, role_name in enumerate(["system_admin", "project_admin"], start=2):
320 with self.subTest(i=i):
321 rid = str(uuid4())
322 self.auth.get_role.return_value = {"_id": rid, "name": role_name, "permissions": {}}
323 with self.assertRaises(EngineException, msg="Accepted renaming of role '{}'".format(role_name)) as e:
324 self.topic.edit(self.fake_session, rid, {"name": "new-name"})
325 self.assertEqual(e.exception.http_code, HTTPStatus.FORBIDDEN, "Wrong HTTP status code")
326 self.assertIn("you cannot rename role '{}'".format(role_name),
327 norm(str(e.exception)), "Wrong exception text")
328 with self.subTest(i=i+1):
329 new_name = "new-role-name"
330 self.auth.get_role_list.side_effect = [[{"_id": rid, "name": self.test_name, "permissions": {}}],
331 [{"_id": str(uuid4()), "name": new_name, "permissions": {}}]]
332 self.auth.get_role.return_value = {"_id": rid, "name": self.test_name, "permissions": {}}
333 with self.assertRaises(EngineException, msg="Accepted existing role name") as e:
334 self.topic.edit(self.fake_session, rid, {"name": new_name})
335 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
336 self.assertIn("role name '{}' exists".format(new_name),
337 norm(str(e.exception)), "Wrong exception text")
338
339 def test_conflict_on_del(self):
340 for i, role_name in enumerate(["system_admin", "project_admin"], start=1):
341 with self.subTest(i=i):
342 rid = str(uuid4())
343 role = {"_id": rid, "name": role_name}
344 self.auth.get_role_list.return_value = [role]
345 self.auth.get_role.return_value = role
346 with self.assertRaises(EngineException, msg="Accepted deletion of role '{}'".format(role_name)) as e:
347 self.topic.delete(self.fake_session, rid)
348 self.assertEqual(e.exception.http_code, HTTPStatus.FORBIDDEN, "Wrong HTTP status code")
349 self.assertIn("you cannot delete role '{}'".format(role_name),
350 norm(str(e.exception)), "Wrong exception text")
351 with self.subTest(i=i+1):
352 rid = str(uuid4())
353 name = "other-role-name"
354 role = {"_id": rid, "name": name}
355 self.auth.get_role_list.return_value = [role]
356 self.auth.get_role.return_value = role
357 self.auth.get_user_list.return_value = [{"_id": str(uuid4()), "username": self.test_name,
358 "project_role_mappings": [{"project": str(uuid4()), "role": rid}]}]
359 with self.assertRaises(EngineException, msg="Accepted deletion of used role") as e:
360 self.topic.delete(self.fake_session, rid)
361 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
362 self.assertIn("role '{}' ({}) is being used by user '{}'".format(name, rid, self.test_name),
363 norm(str(e.exception)), "Wrong exception text")
364
365
366 class Test_UserTopicAuth(TestCase):
367
368 @classmethod
369 def setUpClass(cls):
370 cls.test_name = "test-user-topic"
371
372 def setUp(self):
373 self.db = Mock(dbbase.DbBase())
374 self.fs = Mock(fsbase.FsBase())
375 self.msg = Mock(msgbase.MsgBase())
376 self.auth = Mock(authconn.Authconn(None, None, None))
377 self.topic = UserTopicAuth(self.db, self.fs, self.msg, self.auth)
378 self.fake_session = {"username": test_name, "project_id": (test_pid,), "method": None,
379 "admin": True, "force": False, "public": False, "allow_show_user_project_role": True}
380 self.topic.check_quota = Mock(return_value=None) # skip quota
381
382 def test_new_user(self):
383 uid1 = str(uuid4())
384 pid = str(uuid4())
385 self.auth.get_user_list.return_value = []
386 self.auth.get_project.return_value = {"_id": pid, "name": "some_project"}
387 self.auth.create_user.return_value = {"_id": uid1, "username": self.test_name}
388 with self.subTest(i=1):
389 rollback = []
390 rid = str(uuid4())
391 self.auth.get_role.return_value = {"_id": rid, "name": "some_role"}
392 prms_in = [{"project": "some_project", "role": "some_role"}]
393 prms_out = [{"project": pid, "role": rid}]
394 uid2, oid = self.topic.new(rollback, self.fake_session, {"username": self.test_name,
395 "password": self.test_name,
396 "project_role_mappings": prms_in
397 })
398 self.assertEqual(len(rollback), 1, "Wrong rollback length")
399 self.assertEqual(uid2, uid1, "Wrong project identifier")
400 content = self.auth.create_user.call_args[0][0]
401 self.assertEqual(content["username"], self.test_name, "Wrong project name")
402 self.assertEqual(content["password"], self.test_name, "Wrong password")
403 self.assertEqual(content["project_role_mappings"], prms_out, "Wrong project-role mappings")
404 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
405 self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time")
406 with self.subTest(i=2):
407 rollback = []
408 def_rid = str(uuid4())
409 def_role = {"_id": def_rid, "name": "project_admin"}
410 self.auth.get_role.return_value = def_role
411 self.auth.get_role_list.return_value = [def_role]
412 prms_out = [{"project": pid, "role": def_rid}]
413 uid2, oid = self.topic.new(rollback, self.fake_session, {"username": self.test_name,
414 "password": self.test_name,
415 "projects": ["some_project"]
416 })
417 self.assertEqual(len(rollback), 1, "Wrong rollback length")
418 self.assertEqual(uid2, uid1, "Wrong project identifier")
419 content = self.auth.create_user.call_args[0][0]
420 self.assertEqual(content["username"], self.test_name, "Wrong project name")
421 self.assertEqual(content["password"], self.test_name, "Wrong password")
422 self.assertEqual(content["project_role_mappings"], prms_out, "Wrong project-role mappings")
423 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
424 self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time")
425 with self.subTest(i=3):
426 rollback = []
427 with self.assertRaises(EngineException, msg="Accepted wrong project-role mappings") as e:
428 self.topic.new(rollback, self.fake_session, {"username": "other-project-name",
429 "password": "other-password",
430 "project_role_mappings": [{}]
431 })
432 self.assertEqual(len(rollback), 0, "Wrong rollback length")
433 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
434 self.assertIn("format error at '{}' '{}'"
435 .format("project_role_mappings:{}", "'{}' is a required property").format(0, "project"),
436 norm(str(e.exception)), "Wrong exception text")
437 with self.subTest(i=4):
438 rollback = []
439 with self.assertRaises(EngineException, msg="Accepted wrong projects") as e:
440 self.topic.new(rollback, self.fake_session, {"username": "other-project-name",
441 "password": "other-password",
442 "projects": []
443 })
444 self.assertEqual(len(rollback), 0, "Wrong rollback length")
445 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
446 self.assertIn("format error at '{}' '{}'" .format("projects", "{} is too short").format([]),
447 norm(str(e.exception)), "Wrong exception text")
448
449 def test_edit_user(self):
450 now = time()
451 uid = str(uuid4())
452 pid1 = str(uuid4())
453 rid1 = str(uuid4())
454 prms = [{"project": pid1, "project_name": "project-1", "role": rid1, "role_name": "role-1"}]
455 user = {"_id": uid, "username": self.test_name, "project_role_mappings": prms,
456 "_admin": {"created": now, "modified": now}}
457 with self.subTest(i=1):
458 self.auth.get_user_list.side_effect = [[user], []]
459 self.auth.get_user.return_value = user
460 pid2 = str(uuid4())
461 rid2 = str(uuid4())
462 self.auth.get_project.side_effect = [{"_id": pid2, "name": "project-2"},
463 {"_id": pid1, "name": "project-1"}]
464 self.auth.get_role.side_effect = [{"_id": rid2, "name": "role-2"},
465 {"_id": rid1, "name": "role-1"}]
466 new_name = "new-user-name"
467 new_pasw = "new-password"
468 add_prms = [{"project": pid2, "role": rid2}]
469 rem_prms = [{"project": pid1, "role": rid1}]
470 self.topic.edit(self.fake_session, uid, {"username": new_name, "password": new_pasw,
471 "add_project_role_mappings": add_prms,
472 "remove_project_role_mappings": rem_prms
473 })
474 content = self.auth.update_user.call_args[0][0]
475 self.assertEqual(content["_id"], uid, "Wrong user identifier")
476 self.assertEqual(content["username"], new_name, "Wrong user name")
477 self.assertEqual(content["password"], new_pasw, "Wrong user password")
478 self.assertEqual(content["add_project_role_mappings"], add_prms, "Wrong project-role mappings to add")
479 self.assertEqual(content["remove_project_role_mappings"], prms, "Wrong project-role mappings to remove")
480 with self.subTest(i=2):
481 new_name = "other-user-name"
482 new_prms = [{}]
483 self.auth.get_role_list.side_effect = [[user], []]
484 self.auth.get_user_list.side_effect = [[user]]
485 with self.assertRaises(EngineException, msg="Accepted wrong project-role mappings") as e:
486 self.topic.edit(self.fake_session, uid, {"username": new_name, "project_role_mappings": new_prms})
487 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
488 self.assertIn("format error at '{}' '{}'"
489 .format("project_role_mappings:{}", "'{}' is a required property").format(0, "project"),
490 norm(str(e.exception)), "Wrong exception text")
491
492 def test_delete_user(self):
493 with self.subTest(i=1):
494 uid = str(uuid4())
495 self.fake_session["username"] = self.test_name
496 user = user = {"_id": uid, "username": "other-user-name", "project_role_mappings": []}
497 self.auth.get_user.return_value = user
498 self.auth.delete_user.return_value = {"deleted": 1}
499 rc = self.topic.delete(self.fake_session, uid)
500 self.assertEqual(rc, {"deleted": 1}, "Wrong user deletion return info")
501 self.assertEqual(self.auth.get_user.call_args[0][0], uid, "Wrong user identifier")
502 self.assertEqual(self.auth.delete_user.call_args[0][0], uid, "Wrong user identifier")
503
504 def test_conflict_on_new(self):
505 with self.subTest(i=1):
506 rollback = []
507 uid = str(uuid4())
508 with self.assertRaises(EngineException, msg="Accepted uuid as username") as e:
509 self.topic.new(rollback, self.fake_session, {"username": uid, "password": self.test_name,
510 "projects": [test_pid]})
511 self.assertEqual(len(rollback), 0, "Wrong rollback length")
512 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
513 self.assertIn("username '{}' cannot have a uuid format".format(uid),
514 norm(str(e.exception)), "Wrong exception text")
515 with self.subTest(i=2):
516 rollback = []
517 self.auth.get_user_list.return_value = [{"_id": str(uuid4()), "username": self.test_name}]
518 with self.assertRaises(EngineException, msg="Accepted existing username") as e:
519 self.topic.new(rollback, self.fake_session, {"username": self.test_name, "password": self.test_name,
520 "projects": [test_pid]})
521 self.assertEqual(len(rollback), 0, "Wrong rollback length")
522 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
523 self.assertIn("username '{}' is already used".format(self.test_name),
524 norm(str(e.exception)), "Wrong exception text")
525 with self.subTest(i=3):
526 rollback = []
527 self.auth.get_user_list.return_value = []
528 self.auth.get_role_list.side_effect = [[], []]
529 with self.assertRaises(AuthconnNotFoundException, msg="Accepted user without default role") as e:
530 self.topic.new(rollback, self.fake_session, {"username": self.test_name, "password": self.test_name,
531 "projects": [str(uuid4())]})
532 self.assertEqual(len(rollback), 0, "Wrong rollback length")
533 self.assertEqual(e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code")
534 self.assertIn("can't find default role for user '{}'".format(self.test_name),
535 norm(str(e.exception)), "Wrong exception text")
536
537 def test_conflict_on_edit(self):
538 uid = str(uuid4())
539 with self.subTest(i=1):
540 self.auth.get_user_list.return_value = [{"_id": uid, "username": self.test_name}]
541 new_name = str(uuid4())
542 with self.assertRaises(EngineException, msg="Accepted uuid as username") as e:
543 self.topic.edit(self.fake_session, uid, {"username": new_name})
544 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
545 self.assertIn("username '{}' cannot have an uuid format".format(new_name),
546 norm(str(e.exception)), "Wrong exception text")
547 with self.subTest(i=2):
548 self.auth.get_user_list.return_value = [{"_id": uid, "username": self.test_name}]
549 self.auth.get_role_list.side_effect = [[], []]
550 with self.assertRaises(AuthconnNotFoundException, msg="Accepted user without default role") as e:
551 self.topic.edit(self.fake_session, uid, {"projects": [str(uuid4())]})
552 self.assertEqual(e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code")
553 self.assertIn("can't find a default role for user '{}'".format(self.test_name),
554 norm(str(e.exception)), "Wrong exception text")
555 with self.subTest(i=3):
556 admin_uid = str(uuid4())
557 self.auth.get_user_list.return_value = [{"_id": admin_uid, "username": "admin"}]
558 with self.assertRaises(EngineException, msg="Accepted removing system_admin role from admin user") as e:
559 self.topic.edit(self.fake_session, admin_uid,
560 {"remove_project_role_mappings": [{"project": "admin", "role": "system_admin"}]})
561 self.assertEqual(e.exception.http_code, HTTPStatus.FORBIDDEN, "Wrong HTTP status code")
562 self.assertIn("you cannot remove system_admin role from admin user",
563 norm(str(e.exception)), "Wrong exception text")
564 with self.subTest(i=4):
565 new_name = "new-user-name"
566 self.auth.get_user_list.side_effect = [[{"_id": uid, "name": self.test_name}],
567 [{"_id": str(uuid4()), "name": new_name}]]
568 with self.assertRaises(EngineException, msg="Accepted existing username") as e:
569 self.topic.edit(self.fake_session, uid, {"username": new_name})
570 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
571 self.assertIn("username '{}' is already used".format(new_name),
572 norm(str(e.exception)), "Wrong exception text")
573
574 def test_conflict_on_del(self):
575 with self.subTest(i=1):
576 uid = str(uuid4())
577 self.fake_session["username"] = self.test_name
578 user = user = {"_id": uid, "username": self.test_name, "project_role_mappings": []}
579 self.auth.get_user.return_value = user
580 with self.assertRaises(EngineException, msg="Accepted deletion of own user") as e:
581 self.topic.delete(self.fake_session, uid)
582 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
583 self.assertIn("you cannot delete your own login user", norm(str(e.exception)), "Wrong exception text")
584
585
586 class Test_CommonVimWimSdn(TestCase):
587
588 @classmethod
589 def setUpClass(cls):
590 cls.test_name = "test-cim-topic" # CIM = Common Infrastructure Manager
591
592 def setUp(self):
593 self.db = Mock(dbbase.DbBase())
594 self.fs = Mock(fsbase.FsBase())
595 self.msg = Mock(msgbase.MsgBase())
596 self.auth = Mock(authconn.Authconn(None, None, None))
597 self.topic = CommonVimWimSdn(self.db, self.fs, self.msg, self.auth)
598 # Use WIM schemas for testing because they are the simplest
599 self.topic._send_msg = Mock()
600 self.topic.topic = "wims"
601 self.topic.schema_new = validation.wim_account_new_schema
602 self.topic.schema_edit = validation.wim_account_edit_schema
603 self.fake_session = {"username": test_name, "project_id": (test_pid,), "method": None,
604 "admin": True, "force": False, "public": False, "allow_show_user_project_role": True}
605 self.topic.check_quota = Mock(return_value=None) # skip quota
606
607 def test_new_cvws(self):
608 test_url = "http://0.0.0.0:0"
609 with self.subTest(i=1):
610 rollback = []
611 test_type = "fake"
612 self.db.get_one.return_value = None
613 self.db.create.side_effect = lambda self, content: content["_id"]
614 cid, oid = self.topic.new(rollback, self.fake_session,
615 {"name": self.test_name, "wim_url": test_url, "wim_type": test_type})
616 self.assertEqual(len(rollback), 1, "Wrong rollback length")
617 args = self.db.create.call_args[0]
618 content = args[1]
619 self.assertEqual(args[0], self.topic.topic, "Wrong topic")
620 self.assertEqual(content["_id"], cid, "Wrong CIM identifier")
621 self.assertEqual(content["name"], self.test_name, "Wrong CIM name")
622 self.assertEqual(content["wim_url"], test_url, "Wrong URL")
623 self.assertEqual(content["wim_type"], test_type, "Wrong CIM type")
624 self.assertEqual(content["schema_version"], "1.11", "Wrong schema version")
625 self.assertEqual(content["op_id"], oid, "Wrong operation identifier")
626 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
627 self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time")
628 self.assertEqual(content["_admin"]["operationalState"], "PROCESSING", "Wrong operational state")
629 self.assertEqual(content["_admin"]["projects_read"], [test_pid], "Wrong read-only projects")
630 self.assertEqual(content["_admin"]["projects_write"], [test_pid], "Wrong read/write projects")
631 self.assertIsNone(content["_admin"]["current_operation"], "Wrong current operation")
632 self.assertEqual(len(content["_admin"]["operations"]), 1, "Wrong number of operations")
633 operation = content["_admin"]["operations"][0]
634 self.assertEqual(operation["lcmOperationType"], "create", "Wrong operation type")
635 self.assertEqual(operation["operationState"], "PROCESSING", "Wrong operation state")
636 self.assertGreater(operation["startTime"], content["_admin"]["created"], "Wrong operation start time")
637 self.assertGreater(operation["statusEnteredTime"], content["_admin"]["created"],
638 "Wrong operation status enter time")
639 self.assertEqual(operation["detailed-status"], "", "Wrong operation detailed status info")
640 self.assertIsNone(operation["operationParams"], "Wrong operation parameters")
641 # This test is disabled. From Feature 8030 we admit all WIM/SDN types
642 # with self.subTest(i=2):
643 # rollback = []
644 # test_type = "bad_type"
645 # with self.assertRaises(EngineException, msg="Accepted wrong CIM type") as e:
646 # self.topic.new(rollback, self.fake_session,
647 # {"name": self.test_name, "wim_url": test_url, "wim_type": test_type})
648 # self.assertEqual(len(rollback), 0, "Wrong rollback length")
649 # self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
650 # self.assertIn("format error at '{}' '{}".format("wim_type", "'{}' is not one of {}").format(test_type,""),
651 # norm(str(e.exception)), "Wrong exception text")
652
653 def test_conflict_on_new(self):
654 with self.subTest(i=1):
655 rollback = []
656 test_url = "http://0.0.0.0:0"
657 test_type = "fake"
658 self.db.get_one.return_value = {"_id": str(uuid4()), "name": self.test_name}
659 with self.assertRaises(EngineException, msg="Accepted existing CIM name") as e:
660 self.topic.new(rollback, self.fake_session,
661 {"name": self.test_name, "wim_url": test_url, "wim_type": test_type})
662 self.assertEqual(len(rollback), 0, "Wrong rollback length")
663 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
664 self.assertIn("name '{}' already exists for {}".format(self.test_name, self.topic.topic),
665 norm(str(e.exception)), "Wrong exception text")
666
667 def test_edit_cvws(self):
668 now = time()
669 cid = str(uuid4())
670 test_url = "http://0.0.0.0:0"
671 test_type = "fake"
672 cvws = {"_id": cid, "name": self.test_name, "wim_url": test_url, "wim_type": test_type,
673 "_admin": {"created": now, "modified": now, "operations": [{"lcmOperationType": "create"}]}}
674 with self.subTest(i=1):
675 new_name = "new-cim-name"
676 new_url = "https://1.1.1.1:1"
677 new_type = "onos"
678 self.db.get_one.side_effect = [cvws, None]
679 self.db.replace.return_value = {"updated": 1}
680 # self.db.encrypt.side_effect = [b64str(), b64str()]
681 self.topic.edit(self.fake_session, cid, {"name": new_name, "wim_url": new_url, "wim_type": new_type})
682 args = self.db.replace.call_args[0]
683 content = args[2]
684 self.assertEqual(args[0], self.topic.topic, "Wrong topic")
685 self.assertEqual(args[1], cid, "Wrong CIM identifier")
686 self.assertEqual(content["_id"], cid, "Wrong CIM identifier")
687 self.assertEqual(content["name"], new_name, "Wrong CIM name")
688 self.assertEqual(content["wim_type"], new_type, "Wrong CIM type")
689 self.assertEqual(content["wim_url"], new_url, "Wrong URL")
690 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
691 self.assertGreater(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time")
692 self.assertEqual(len(content["_admin"]["operations"]), 2, "Wrong number of operations")
693 operation = content["_admin"]["operations"][1]
694 self.assertEqual(operation["lcmOperationType"], "edit", "Wrong operation type")
695 self.assertEqual(operation["operationState"], "PROCESSING", "Wrong operation state")
696 self.assertGreater(operation["startTime"], content["_admin"]["modified"], "Wrong operation start time")
697 self.assertGreater(operation["statusEnteredTime"], content["_admin"]["modified"],
698 "Wrong operation status enter time")
699 self.assertEqual(operation["detailed-status"], "", "Wrong operation detailed status info")
700 self.assertIsNone(operation["operationParams"], "Wrong operation parameters")
701 with self.subTest(i=2):
702 self.db.get_one.side_effect = [cvws]
703 with self.assertRaises(EngineException, msg="Accepted wrong property") as e:
704 self.topic.edit(self.fake_session, str(uuid4()), {"name": "new-name", "extra_prop": "anything"})
705 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
706 self.assertIn("format error '{}'".format("additional properties are not allowed ('{}' was unexpected)").
707 format("extra_prop"),
708 norm(str(e.exception)), "Wrong exception text")
709
710 def test_conflict_on_edit(self):
711 with self.subTest(i=1):
712 cid = str(uuid4())
713 new_name = "new-cim-name"
714 self.db.get_one.side_effect = [{"_id": cid, "name": self.test_name},
715 {"_id": str(uuid4()), "name": new_name}]
716 with self.assertRaises(EngineException, msg="Accepted existing CIM name") as e:
717 self.topic.edit(self.fake_session, cid, {"name": new_name})
718 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
719 self.assertIn("name '{}' already exists for {}".format(new_name, self.topic.topic),
720 norm(str(e.exception)), "Wrong exception text")
721
722 def test_delete_cvws(self):
723 cid = str(uuid4())
724 ro_pid = str(uuid4())
725 rw_pid = str(uuid4())
726 cvws = {"_id": cid, "name": self.test_name}
727 self.db.get_list.return_value = []
728 with self.subTest(i=1):
729 cvws["_admin"] = {"projects_read": [test_pid, ro_pid, rw_pid], "projects_write": [test_pid, rw_pid]}
730 self.db.get_one.return_value = cvws
731 oid = self.topic.delete(self.fake_session, cid)
732 self.assertIsNone(oid, "Wrong operation identifier")
733 self.assertEqual(self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic")
734 self.assertEqual(self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier")
735 self.assertEqual(self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic")
736 self.assertEqual(self.db.set_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier")
737 self.assertEqual(self.db.set_one.call_args[1]["update_dict"], None,
738 "Wrong read-only projects update")
739 self.assertEqual(self.db.set_one.call_args[1]["pull_list"],
740 {"_admin.projects_read": (test_pid,), "_admin.projects_write": (test_pid,)},
741 "Wrong read/write projects update")
742 self.topic._send_msg.assert_not_called()
743 with self.subTest(i=2):
744 now = time()
745 cvws["_admin"] = {"projects_read": [test_pid], "projects_write": [test_pid], "operations": []}
746 self.db.get_one.return_value = cvws
747 oid = self.topic.delete(self.fake_session, cid)
748 self.assertEqual(oid, cid+":0", "Wrong operation identifier")
749 self.assertEqual(self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic")
750 self.assertEqual(self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier")
751 self.assertEqual(self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic")
752 self.assertEqual(self.db.set_one.call_args[0][1]["_id"], cid, "Wrong user identifier")
753 self.assertEqual(self.db.set_one.call_args[1]["update_dict"], {"_admin.to_delete": True},
754 "Wrong _admin.to_delete update")
755 operation = self.db.set_one.call_args[1]["push"]["_admin.operations"]
756 self.assertEqual(operation["lcmOperationType"], "delete", "Wrong operation type")
757 self.assertEqual(operation["operationState"], "PROCESSING", "Wrong operation state")
758 self.assertEqual(operation["detailed-status"], "", "Wrong operation detailed status")
759 self.assertIsNone(operation["operationParams"], "Wrong operation parameters")
760 self.assertGreater(operation["startTime"], now, "Wrong operation start time")
761 self.assertGreater(operation["statusEnteredTime"], now, "Wrong operation status enter time")
762 self.topic._send_msg.assert_called_once_with("delete", {"_id": cid, "op_id": cid + ":0"}, not_send_msg=None)
763 with self.subTest(i=3):
764 cvws["_admin"] = {"projects_read": [], "projects_write": [], "operations": []}
765 self.db.get_one.return_value = cvws
766 self.topic._send_msg.reset_mock()
767 self.db.get_one.reset_mock()
768 self.db.del_one.reset_mock()
769 self.fake_session["force"] = True # to force deletion
770 self.fake_session["admin"] = True # to force deletion
771 self.fake_session["project_id"] = [] # to force deletion
772 oid = self.topic.delete(self.fake_session, cid)
773 self.assertIsNone(oid, "Wrong operation identifier")
774 self.assertEqual(self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic")
775 self.assertEqual(self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier")
776 self.assertEqual(self.db.del_one.call_args[0][0], self.topic.topic, "Wrong topic")
777 self.assertEqual(self.db.del_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier")
778 self.topic._send_msg.assert_called_once_with("deleted", {"_id": cid, "op_id": None}, not_send_msg=None)
779
780
781 if __name__ == '__main__':
782 unittest.main()