2 # -*- coding: utf-8 -*-
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
17 __author__
= "Pedro de la Cruz Ramos, pedro.delacruzramos@altran.com"
18 __date__
= "$2019-10-019"
21 from unittest
import TestCase
22 from unittest
.mock
import Mock
23 from uuid
import uuid4
24 from http
import HTTPStatus
26 from random
import randint
27 from osm_common
import dbbase
, fsbase
, msgbase
28 from osm_nbi
import authconn
, validation
29 from osm_nbi
.admin_topics
import ProjectTopicAuth
, RoleTopicAuth
, UserTopicAuth
, CommonVimWimSdn
30 from osm_nbi
.engine
import EngineException
31 from osm_nbi
.authconn
import AuthconnNotFoundException
34 test_pid
= str(uuid4())
35 test_name
= "test-user"
39 """Normalize string for checking"""
40 return ' '.join(str.strip().split()).lower()
43 class Test_ProjectTopicAuth(TestCase
):
47 cls
.test_name
= "test-project-topic"
50 self
.db
= Mock(dbbase
.DbBase())
51 self
.fs
= Mock(fsbase
.FsBase())
52 self
.msg
= Mock(msgbase
.MsgBase())
53 self
.auth
= Mock(authconn
.Authconn(None, None))
54 self
.topic
= ProjectTopicAuth(self
.db
, self
.fs
, self
.msg
, self
.auth
)
55 self
.fake_session
= {"username": self
.test_name
, "project_id": (test_pid
,), "method": None,
56 "admin": True, "force": False, "public": False, "allow_show_user_project_role": True}
58 def test_new_project(self
):
59 with self
.subTest(i
=1):
62 self
.auth
.get_project_list
.return_value
= []
63 self
.auth
.create_project
.return_value
= pid1
64 pid2
, oid
= self
.topic
.new(rollback
, self
.fake_session
, {"name": self
.test_name
, "quotas": {}})
65 self
.assertEqual(len(rollback
), 1, "Wrong rollback length")
66 self
.assertEqual(pid2
, pid1
, "Wrong project identifier")
67 content
= self
.auth
.create_project
.call_args
[0][0]
68 self
.assertEqual(content
["name"], self
.test_name
, "Wrong project name")
69 self
.assertEqual(content
["quotas"], {}, "Wrong quotas")
70 self
.assertIsNotNone(content
["_admin"]["created"], "Wrong creation time")
71 self
.assertEqual(content
["_admin"]["modified"], content
["_admin"]["created"], "Wrong modification time")
72 with self
.subTest(i
=2):
74 with self
.assertRaises(EngineException
, msg
="Accepted wrong quotas") as e
:
75 self
.topic
.new(rollback
, self
.fake_session
, {"name": "other-project-name", "quotas": {"baditems": 10}})
76 self
.assertEqual(len(rollback
), 0, "Wrong rollback length")
77 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.UNPROCESSABLE_ENTITY
, "Wrong HTTP status code")
78 self
.assertIn("format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'"
79 .format("baditems"), norm(str(e
.exception
)), "Wrong exception text")
81 def test_edit_project(self
):
84 proj
= {"_id": pid
, "name": self
.test_name
, "_admin": {"created": now
, "modified": now
}}
85 with self
.subTest(i
=1):
86 self
.auth
.get_project_list
.side_effect
= [[proj
], []]
87 new_name
= "new-project-name"
88 quotas
= {"vnfds": randint(0, 100), "nsds": randint(0, 100)}
89 self
.topic
.edit(self
.fake_session
, pid
, {"name": new_name
, "quotas": quotas
})
90 _id
, content
= self
.auth
.update_project
.call_args
[0]
91 self
.assertEqual(_id
, pid
, "Wrong project identifier")
92 self
.assertEqual(content
["_id"], pid
, "Wrong project identifier")
93 self
.assertEqual(content
["_admin"]["created"], now
, "Wrong creation time")
94 self
.assertGreater(content
["_admin"]["modified"], now
, "Wrong modification time")
95 self
.assertEqual(content
["name"], new_name
, "Wrong project name")
96 self
.assertEqual(content
["quotas"], quotas
, "Wrong quotas")
97 with self
.subTest(i
=2):
98 new_name
= "other-project-name"
99 quotas
= {"baditems": randint(0, 100)}
100 self
.auth
.get_project_list
.side_effect
= [[proj
], []]
101 with self
.assertRaises(EngineException
, msg
="Accepted wrong quotas") as e
:
102 self
.topic
.edit(self
.fake_session
, pid
, {"name": new_name
, "quotas": quotas
})
103 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.UNPROCESSABLE_ENTITY
, "Wrong HTTP status code")
104 self
.assertIn("format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'"
105 .format("baditems"), norm(str(e
.exception
)), "Wrong exception text")
107 def test_conflict_on_new(self
):
108 with self
.subTest(i
=1):
111 with self
.assertRaises(EngineException
, msg
="Accepted uuid as project name") as e
:
112 self
.topic
.new(rollback
, self
.fake_session
, {"name": pid
})
113 self
.assertEqual(len(rollback
), 0, "Wrong rollback length")
114 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.UNPROCESSABLE_ENTITY
, "Wrong HTTP status code")
115 self
.assertIn("project name '{}' cannot have an uuid format".format(pid
),
116 norm(str(e
.exception
)), "Wrong exception text")
117 with self
.subTest(i
=2):
119 self
.auth
.get_project_list
.return_value
= [{"_id": test_pid
, "name": self
.test_name
}]
120 with self
.assertRaises(EngineException
, msg
="Accepted existing project name") as e
:
121 self
.topic
.new(rollback
, self
.fake_session
, {"name": self
.test_name
})
122 self
.assertEqual(len(rollback
), 0, "Wrong rollback length")
123 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.CONFLICT
, "Wrong HTTP status code")
124 self
.assertIn("project '{}' exists".format(self
.test_name
),
125 norm(str(e
.exception
)), "Wrong exception text")
127 def test_conflict_on_edit(self
):
128 with self
.subTest(i
=1):
129 self
.auth
.get_project_list
.return_value
= [{"_id": test_pid
, "name": self
.test_name
}]
130 new_name
= str(uuid4())
131 with self
.assertRaises(EngineException
, msg
="Accepted uuid as project name") as e
:
132 self
.topic
.edit(self
.fake_session
, test_pid
, {"name": new_name
})
133 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.UNPROCESSABLE_ENTITY
, "Wrong HTTP status code")
134 self
.assertIn("project name '{}' cannot have an uuid format".format(new_name
),
135 norm(str(e
.exception
)), "Wrong exception text")
136 with self
.subTest(i
=2):
138 self
.auth
.get_project_list
.return_value
= [{"_id": pid
, "name": "admin"}]
139 with self
.assertRaises(EngineException
, msg
="Accepted renaming of project 'admin'") as e
:
140 self
.topic
.edit(self
.fake_session
, pid
, {"name": "new-name"})
141 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.CONFLICT
, "Wrong HTTP status code")
142 self
.assertIn("you cannot rename project 'admin'",
143 norm(str(e
.exception
)), "Wrong exception text")
144 with self
.subTest(i
=3):
145 new_name
= "new-project-name"
146 self
.auth
.get_project_list
.side_effect
= [[{"_id": test_pid
, "name": self
.test_name
}],
147 [{"_id": str(uuid4()), "name": new_name
}]]
148 with self
.assertRaises(EngineException
, msg
="Accepted existing project name") as e
:
149 self
.topic
.edit(self
.fake_session
, pid
, {"name": new_name
})
150 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.CONFLICT
, "Wrong HTTP status code")
151 self
.assertIn("project '{}' is already used".format(new_name
),
152 norm(str(e
.exception
)), "Wrong exception text")
154 def test_delete_project(self
):
155 with self
.subTest(i
=1):
157 self
.auth
.get_project
.return_value
= {"_id": pid
, "name": "other-project-name"}
158 self
.auth
.delete_project
.return_value
= {"deleted": 1}
159 self
.auth
.get_user_list
.return_value
= []
160 self
.db
.get_list
.return_value
= []
161 rc
= self
.topic
.delete(self
.fake_session
, pid
)
162 self
.assertEqual(rc
, {"deleted": 1}, "Wrong project deletion return info")
163 self
.assertEqual(self
.auth
.get_project
.call_args
[0][0], pid
, "Wrong project identifier")
164 self
.assertEqual(self
.auth
.delete_project
.call_args
[0][0], pid
, "Wrong project identifier")
166 def test_conflict_on_del(self
):
167 with self
.subTest(i
=1):
168 self
.auth
.get_project
.return_value
= {"_id": test_pid
, "name": self
.test_name
}
169 with self
.assertRaises(EngineException
, msg
="Accepted deletion of own project") as e
:
170 self
.topic
.delete(self
.fake_session
, self
.test_name
)
171 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.CONFLICT
, "Wrong HTTP status code")
172 self
.assertIn("you cannot delete your own project", norm(str(e
.exception
)), "Wrong exception text")
173 with self
.subTest(i
=2):
174 self
.auth
.get_project
.return_value
= {"_id": str(uuid4()), "name": "admin"}
175 with self
.assertRaises(EngineException
, msg
="Accepted deletion of project 'admin'") as e
:
176 self
.topic
.delete(self
.fake_session
, "admin")
177 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.CONFLICT
, "Wrong HTTP status code")
178 self
.assertIn("you cannot delete project 'admin'", norm(str(e
.exception
)), "Wrong exception text")
179 with self
.subTest(i
=3):
181 name
= "other-project-name"
182 self
.auth
.get_project
.return_value
= {"_id": pid
, "name": name
}
183 self
.auth
.get_user_list
.return_value
= [{"_id": str(uuid4()), "username": self
.test_name
,
184 "project_role_mappings": [{"project": pid
, "role": str(uuid4())}]}]
185 with self
.assertRaises(EngineException
, msg
="Accepted deletion of used project") as e
:
186 self
.topic
.delete(self
.fake_session
, pid
)
187 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.CONFLICT
, "Wrong HTTP status code")
188 self
.assertIn("project '{}' ({}) is being used by user '{}'".format(name
, pid
, self
.test_name
),
189 norm(str(e
.exception
)), "Wrong exception text")
190 with self
.subTest(i
=4):
191 self
.auth
.get_user_list
.return_value
= []
192 self
.db
.get_list
.return_value
= [{"_id": str(uuid4()), "id": self
.test_name
,
193 "_admin": {"projects_read": [pid
], "projects_write": []}}]
194 with self
.assertRaises(EngineException
, msg
="Accepted deletion of used project") as e
:
195 self
.topic
.delete(self
.fake_session
, pid
)
196 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.CONFLICT
, "Wrong HTTP status code")
197 self
.assertIn("project '{}' ({}) is being used by {} '{}'"
198 .format(name
, pid
, "vnf descriptor", self
.test_name
),
199 norm(str(e
.exception
)), "Wrong exception text")
202 class Test_RoleTopicAuth(TestCase
):
206 cls
.test_name
= "test-role-topic"
207 cls
.test_operations
= ["tokens:get"]
210 self
.db
= Mock(dbbase
.DbBase())
211 self
.fs
= Mock(fsbase
.FsBase())
212 self
.msg
= Mock(msgbase
.MsgBase())
213 self
.auth
= Mock(authconn
.Authconn(None, None))
214 self
.topic
= RoleTopicAuth(self
.db
, self
.fs
, self
.msg
, self
.auth
, self
.test_operations
)
215 self
.fake_session
= {"username": test_name
, "project_id": (test_pid
,), "method": None,
216 "admin": True, "force": False, "public": False, "allow_show_user_project_role": True}
218 def test_new_role(self
):
219 with self
.subTest(i
=1):
222 perms_in
= {"tokens": True}
223 perms_out
= {"default": False, "admin": False, "tokens": True}
224 self
.auth
.get_role_list
.return_value
= []
225 self
.auth
.create_role
.return_value
= rid1
226 rid2
, oid
= self
.topic
.new(rollback
, self
.fake_session
, {"name": self
.test_name
, "permissions": perms_in
})
227 self
.assertEqual(len(rollback
), 1, "Wrong rollback length")
228 self
.assertEqual(rid2
, rid1
, "Wrong project identifier")
229 content
= self
.auth
.create_role
.call_args
[0][0]
230 self
.assertEqual(content
["name"], self
.test_name
, "Wrong role name")
231 self
.assertEqual(content
["permissions"], perms_out
, "Wrong permissions")
232 self
.assertIsNotNone(content
["_admin"]["created"], "Wrong creation time")
233 self
.assertEqual(content
["_admin"]["modified"], content
["_admin"]["created"], "Wrong modification time")
234 with self
.subTest(i
=2):
236 with self
.assertRaises(EngineException
, msg
="Accepted wrong permissions") as e
:
237 self
.topic
.new(rollback
, self
.fake_session
,
238 {"name": "other-role-name", "permissions": {"projects": True}})
239 self
.assertEqual(len(rollback
), 0, "Wrong rollback length")
240 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.UNPROCESSABLE_ENTITY
, "Wrong HTTP status code")
241 self
.assertIn("invalid permission '{}'".format("projects"),
242 norm(str(e
.exception
)), "Wrong exception text")
244 def test_edit_role(self
):
247 role
= {"_id": rid
, "name": self
.test_name
, "permissions": {"tokens": True},
248 "_admin": {"created": now
, "modified": now
}}
249 with self
.subTest(i
=1):
250 self
.auth
.get_role_list
.side_effect
= [[role
], []]
251 self
.auth
.get_role
.return_value
= role
252 new_name
= "new-role-name"
253 perms_in
= {"tokens": False, "tokens:get": True}
254 perms_out
= {"default": False, "admin": False, "tokens": False, "tokens:get": True}
255 self
.topic
.edit(self
.fake_session
, rid
, {"name": new_name
, "permissions": perms_in
})
256 content
= self
.auth
.update_role
.call_args
[0][0]
257 self
.assertEqual(content
["_id"], rid
, "Wrong role identifier")
258 self
.assertEqual(content
["_admin"]["created"], now
, "Wrong creation time")
259 self
.assertGreater(content
["_admin"]["modified"], now
, "Wrong modification time")
260 self
.assertEqual(content
["name"], new_name
, "Wrong role name")
261 self
.assertEqual(content
["permissions"], perms_out
, "Wrong permissions")
262 with self
.subTest(i
=2):
263 new_name
= "other-role-name"
264 perms_in
= {"tokens": False, "tokens:post": True}
265 self
.auth
.get_role_list
.side_effect
= [[role
], []]
266 with self
.assertRaises(EngineException
, msg
="Accepted wrong permissions") as e
:
267 self
.topic
.edit(self
.fake_session
, rid
, {"name": new_name
, "permissions": perms_in
})
268 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.UNPROCESSABLE_ENTITY
, "Wrong HTTP status code")
269 self
.assertIn("invalid permission '{}'".format("tokens:post"),
270 norm(str(e
.exception
)), "Wrong exception text")
272 def test_delete_role(self
):
273 with self
.subTest(i
=1):
275 role
= {"_id": rid
, "name": "other-role-name"}
276 self
.auth
.get_role_list
.return_value
= [role
]
277 self
.auth
.get_role
.return_value
= role
278 self
.auth
.delete_role
.return_value
= {"deleted": 1}
279 self
.auth
.get_user_list
.return_value
= []
280 rc
= self
.topic
.delete(self
.fake_session
, rid
)
281 self
.assertEqual(rc
, {"deleted": 1}, "Wrong role deletion return info")
282 self
.assertEqual(self
.auth
.get_role_list
.call_args
[0][0]["_id"], rid
, "Wrong role identifier")
283 self
.assertEqual(self
.auth
.get_role
.call_args
[0][0], rid
, "Wrong role identifier")
284 self
.assertEqual(self
.auth
.delete_role
.call_args
[0][0], rid
, "Wrong role identifier")
286 def test_conflict_on_new(self
):
287 with self
.subTest(i
=1):
290 with self
.assertRaises(EngineException
, msg
="Accepted uuid as role name") as e
:
291 self
.topic
.new(rollback
, self
.fake_session
, {"name": rid
})
292 self
.assertEqual(len(rollback
), 0, "Wrong rollback length")
293 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.UNPROCESSABLE_ENTITY
, "Wrong HTTP status code")
294 self
.assertIn("role name '{}' cannot have an uuid format".format(rid
),
295 norm(str(e
.exception
)), "Wrong exception text")
296 with self
.subTest(i
=2):
298 self
.auth
.get_role_list
.return_value
= [{"_id": str(uuid4()), "name": self
.test_name
}]
299 with self
.assertRaises(EngineException
, msg
="Accepted existing role name") as e
:
300 self
.topic
.new(rollback
, self
.fake_session
, {"name": self
.test_name
})
301 self
.assertEqual(len(rollback
), 0, "Wrong rollback length")
302 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.CONFLICT
, "Wrong HTTP status code")
303 self
.assertIn("role name '{}' exists".format(self
.test_name
),
304 norm(str(e
.exception
)), "Wrong exception text")
306 def test_conflict_on_edit(self
):
308 with self
.subTest(i
=1):
309 self
.auth
.get_role_list
.return_value
= [{"_id": rid
, "name": self
.test_name
, "permissions": {}}]
310 new_name
= str(uuid4())
311 with self
.assertRaises(EngineException
, msg
="Accepted uuid as role name") as e
:
312 self
.topic
.edit(self
.fake_session
, rid
, {"name": new_name
})
313 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.UNPROCESSABLE_ENTITY
, "Wrong HTTP status code")
314 self
.assertIn("role name '{}' cannot have an uuid format".format(new_name
),
315 norm(str(e
.exception
)), "Wrong exception text")
316 for i
, role_name
in enumerate(["system_admin", "project_admin"], start
=2):
317 with self
.subTest(i
=i
):
319 self
.auth
.get_role
.return_value
= {"_id": rid
, "name": role_name
, "permissions": {}}
320 with self
.assertRaises(EngineException
, msg
="Accepted renaming of role '{}'".format(role_name
)) as e
:
321 self
.topic
.edit(self
.fake_session
, rid
, {"name": "new-name"})
322 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.FORBIDDEN
, "Wrong HTTP status code")
323 self
.assertIn("you cannot rename role '{}'".format(role_name
),
324 norm(str(e
.exception
)), "Wrong exception text")
325 with self
.subTest(i
=i
+1):
326 new_name
= "new-role-name"
327 self
.auth
.get_role_list
.side_effect
= [[{"_id": rid
, "name": self
.test_name
, "permissions": {}}],
328 [{"_id": str(uuid4()), "name": new_name
, "permissions": {}}]]
329 self
.auth
.get_role
.return_value
= {"_id": rid
, "name": self
.test_name
, "permissions": {}}
330 with self
.assertRaises(EngineException
, msg
="Accepted existing role name") as e
:
331 self
.topic
.edit(self
.fake_session
, rid
, {"name": new_name
})
332 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.CONFLICT
, "Wrong HTTP status code")
333 self
.assertIn("role name '{}' exists".format(new_name
),
334 norm(str(e
.exception
)), "Wrong exception text")
336 def test_conflict_on_del(self
):
337 for i
, role_name
in enumerate(["system_admin", "project_admin"], start
=1):
338 with self
.subTest(i
=i
):
340 role
= {"_id": rid
, "name": role_name
}
341 self
.auth
.get_role_list
.return_value
= [role
]
342 self
.auth
.get_role
.return_value
= role
343 with self
.assertRaises(EngineException
, msg
="Accepted deletion of role '{}'".format(role_name
)) as e
:
344 self
.topic
.delete(self
.fake_session
, rid
)
345 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.FORBIDDEN
, "Wrong HTTP status code")
346 self
.assertIn("you cannot delete role '{}'".format(role_name
),
347 norm(str(e
.exception
)), "Wrong exception text")
348 with self
.subTest(i
=i
+1):
350 name
= "other-role-name"
351 role
= {"_id": rid
, "name": name
}
352 self
.auth
.get_role_list
.return_value
= [role
]
353 self
.auth
.get_role
.return_value
= role
354 self
.auth
.get_user_list
.return_value
= [{"_id": str(uuid4()), "username": self
.test_name
,
355 "project_role_mappings": [{"project": str(uuid4()), "role": rid
}]}]
356 with self
.assertRaises(EngineException
, msg
="Accepted deletion of used role") as e
:
357 self
.topic
.delete(self
.fake_session
, rid
)
358 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.CONFLICT
, "Wrong HTTP status code")
359 self
.assertIn("role '{}' ({}) is being used by user '{}'".format(name
, rid
, self
.test_name
),
360 norm(str(e
.exception
)), "Wrong exception text")
363 class Test_UserTopicAuth(TestCase
):
367 cls
.test_name
= "test-user-topic"
370 self
.db
= Mock(dbbase
.DbBase())
371 self
.fs
= Mock(fsbase
.FsBase())
372 self
.msg
= Mock(msgbase
.MsgBase())
373 self
.auth
= Mock(authconn
.Authconn(None, None))
374 self
.topic
= UserTopicAuth(self
.db
, self
.fs
, self
.msg
, self
.auth
)
375 self
.fake_session
= {"username": test_name
, "project_id": (test_pid
,), "method": None,
376 "admin": True, "force": False, "public": False, "allow_show_user_project_role": True}
378 def test_new_user(self
):
381 self
.auth
.get_user_list
.return_value
= []
382 self
.auth
.get_project
.return_value
= {"_id": pid
, "name": "some_project"}
383 self
.auth
.create_user
.return_value
= {"_id": uid1
, "username": self
.test_name
}
384 with self
.subTest(i
=1):
387 self
.auth
.get_role
.return_value
= {"_id": rid
, "name": "some_role"}
388 prms_in
= [{"project": "some_project", "role": "some_role"}]
389 prms_out
= [{"project": pid
, "role": rid
}]
390 uid2
, oid
= self
.topic
.new(rollback
, self
.fake_session
, {"username": self
.test_name
,
391 "password": self
.test_name
,
392 "project_role_mappings": prms_in
394 self
.assertEqual(len(rollback
), 1, "Wrong rollback length")
395 self
.assertEqual(uid2
, uid1
, "Wrong project identifier")
396 content
= self
.auth
.create_user
.call_args
[0][0]
397 self
.assertEqual(content
["username"], self
.test_name
, "Wrong project name")
398 self
.assertEqual(content
["password"], self
.test_name
, "Wrong password")
399 self
.assertEqual(content
["project_role_mappings"], prms_out
, "Wrong project-role mappings")
400 self
.assertIsNotNone(content
["_admin"]["created"], "Wrong creation time")
401 self
.assertEqual(content
["_admin"]["modified"], content
["_admin"]["created"], "Wrong modification time")
402 with self
.subTest(i
=2):
404 def_rid
= str(uuid4())
405 def_role
= {"_id": def_rid
, "name": "project_admin"}
406 self
.auth
.get_role
.return_value
= def_role
407 self
.auth
.get_role_list
.return_value
= [def_role
]
408 prms_out
= [{"project": pid
, "role": def_rid
}]
409 uid2
, oid
= self
.topic
.new(rollback
, self
.fake_session
, {"username": self
.test_name
,
410 "password": self
.test_name
,
411 "projects": ["some_project"]
413 self
.assertEqual(len(rollback
), 1, "Wrong rollback length")
414 self
.assertEqual(uid2
, uid1
, "Wrong project identifier")
415 content
= self
.auth
.create_user
.call_args
[0][0]
416 self
.assertEqual(content
["username"], self
.test_name
, "Wrong project name")
417 self
.assertEqual(content
["password"], self
.test_name
, "Wrong password")
418 self
.assertEqual(content
["project_role_mappings"], prms_out
, "Wrong project-role mappings")
419 self
.assertIsNotNone(content
["_admin"]["created"], "Wrong creation time")
420 self
.assertEqual(content
["_admin"]["modified"], content
["_admin"]["created"], "Wrong modification time")
421 with self
.subTest(i
=3):
423 with self
.assertRaises(EngineException
, msg
="Accepted wrong project-role mappings") as e
:
424 self
.topic
.new(rollback
, self
.fake_session
, {"username": "other-project-name",
425 "password": "other-password",
426 "project_role_mappings": [{}]
428 self
.assertEqual(len(rollback
), 0, "Wrong rollback length")
429 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.UNPROCESSABLE_ENTITY
, "Wrong HTTP status code")
430 self
.assertIn("format error at '{}' '{}'"
431 .format("project_role_mappings:{}", "'{}' is a required property").format(0, "project"),
432 norm(str(e
.exception
)), "Wrong exception text")
433 with self
.subTest(i
=4):
435 with self
.assertRaises(EngineException
, msg
="Accepted wrong projects") as e
:
436 self
.topic
.new(rollback
, self
.fake_session
, {"username": "other-project-name",
437 "password": "other-password",
440 self
.assertEqual(len(rollback
), 0, "Wrong rollback length")
441 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.UNPROCESSABLE_ENTITY
, "Wrong HTTP status code")
442 self
.assertIn("format error at '{}' '{}'" .format("projects", "{} is too short").format([]),
443 norm(str(e
.exception
)), "Wrong exception text")
445 def test_edit_user(self
):
450 prms
= [{"project": pid1
, "project_name": "project-1", "role": rid1
, "role_name": "role-1"}]
451 user
= {"_id": uid
, "username": self
.test_name
, "project_role_mappings": prms
,
452 "_admin": {"created": now
, "modified": now
}}
453 with self
.subTest(i
=1):
454 self
.auth
.get_user_list
.side_effect
= [[user
], []]
455 self
.auth
.get_user
.return_value
= user
458 self
.auth
.get_project
.side_effect
= [{"_id": pid2
, "name": "project-2"},
459 {"_id": pid1
, "name": "project-1"}]
460 self
.auth
.get_role
.side_effect
= [{"_id": rid2
, "name": "role-2"},
461 {"_id": rid1
, "name": "role-1"}]
462 new_name
= "new-user-name"
463 new_pasw
= "new-password"
464 add_prms
= [{"project": pid2
, "role": rid2
}]
465 rem_prms
= [{"project": pid1
, "role": rid1
}]
466 self
.topic
.edit(self
.fake_session
, uid
, {"username": new_name
, "password": new_pasw
,
467 "add_project_role_mappings": add_prms
,
468 "remove_project_role_mappings": rem_prms
470 content
= self
.auth
.update_user
.call_args
[0][0]
471 self
.assertEqual(content
["_id"], uid
, "Wrong user identifier")
472 self
.assertEqual(content
["username"], new_name
, "Wrong user name")
473 self
.assertEqual(content
["password"], new_pasw
, "Wrong user password")
474 self
.assertEqual(content
["add_project_role_mappings"], add_prms
, "Wrong project-role mappings to add")
475 self
.assertEqual(content
["remove_project_role_mappings"], prms
, "Wrong project-role mappings to remove")
476 with self
.subTest(i
=2):
477 new_name
= "other-user-name"
479 self
.auth
.get_role_list
.side_effect
= [[user
], []]
480 with self
.assertRaises(EngineException
, msg
="Accepted wrong project-role mappings") as e
:
481 self
.topic
.edit(self
.fake_session
, uid
, {"username": new_name
, "project_role_mappings": new_prms
})
482 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.UNPROCESSABLE_ENTITY
, "Wrong HTTP status code")
483 self
.assertIn("format error at '{}' '{}'"
484 .format("project_role_mappings:{}", "'{}' is a required property").format(0, "project"),
485 norm(str(e
.exception
)), "Wrong exception text")
487 def test_delete_user(self
):
488 with self
.subTest(i
=1):
490 self
.fake_session
["username"] = self
.test_name
491 user
= user
= {"_id": uid
, "username": "other-user-name", "project_role_mappings": []}
492 self
.auth
.get_user
.return_value
= user
493 self
.auth
.delete_user
.return_value
= {"deleted": 1}
494 rc
= self
.topic
.delete(self
.fake_session
, uid
)
495 self
.assertEqual(rc
, {"deleted": 1}, "Wrong user deletion return info")
496 self
.assertEqual(self
.auth
.get_user
.call_args
[0][0], uid
, "Wrong user identifier")
497 self
.assertEqual(self
.auth
.delete_user
.call_args
[0][0], uid
, "Wrong user identifier")
499 def test_conflict_on_new(self
):
500 with self
.subTest(i
=1):
503 with self
.assertRaises(EngineException
, msg
="Accepted uuid as username") as e
:
504 self
.topic
.new(rollback
, self
.fake_session
, {"username": uid
, "password": self
.test_name
,
505 "projects": [test_pid
]})
506 self
.assertEqual(len(rollback
), 0, "Wrong rollback length")
507 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.UNPROCESSABLE_ENTITY
, "Wrong HTTP status code")
508 self
.assertIn("username '{}' cannot have a uuid format".format(uid
),
509 norm(str(e
.exception
)), "Wrong exception text")
510 with self
.subTest(i
=2):
512 self
.auth
.get_user_list
.return_value
= [{"_id": str(uuid4()), "username": self
.test_name
}]
513 with self
.assertRaises(EngineException
, msg
="Accepted existing username") as e
:
514 self
.topic
.new(rollback
, self
.fake_session
, {"username": self
.test_name
, "password": self
.test_name
,
515 "projects": [test_pid
]})
516 self
.assertEqual(len(rollback
), 0, "Wrong rollback length")
517 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.CONFLICT
, "Wrong HTTP status code")
518 self
.assertIn("username '{}' is already used".format(self
.test_name
),
519 norm(str(e
.exception
)), "Wrong exception text")
520 with self
.subTest(i
=3):
522 self
.auth
.get_user_list
.return_value
= []
523 self
.auth
.get_role_list
.side_effect
= [[], []]
524 with self
.assertRaises(AuthconnNotFoundException
, msg
="Accepted user without default role") as e
:
525 self
.topic
.new(rollback
, self
.fake_session
, {"username": self
.test_name
, "password": self
.test_name
,
526 "projects": [str(uuid4())]})
527 self
.assertEqual(len(rollback
), 0, "Wrong rollback length")
528 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.NOT_FOUND
, "Wrong HTTP status code")
529 self
.assertIn("can't find default role for user '{}'".format(self
.test_name
),
530 norm(str(e
.exception
)), "Wrong exception text")
532 def test_conflict_on_edit(self
):
534 with self
.subTest(i
=1):
535 self
.auth
.get_user_list
.return_value
= [{"_id": uid
, "username": self
.test_name
}]
536 new_name
= str(uuid4())
537 with self
.assertRaises(EngineException
, msg
="Accepted uuid as username") as e
:
538 self
.topic
.edit(self
.fake_session
, uid
, {"username": new_name
})
539 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.UNPROCESSABLE_ENTITY
, "Wrong HTTP status code")
540 self
.assertIn("username '{}' cannot have an uuid format".format(new_name
),
541 norm(str(e
.exception
)), "Wrong exception text")
542 with self
.subTest(i
=2):
543 self
.auth
.get_user_list
.return_value
= [{"_id": uid
, "username": self
.test_name
}]
544 self
.auth
.get_role_list
.side_effect
= [[], []]
545 with self
.assertRaises(AuthconnNotFoundException
, msg
="Accepted user without default role") as e
:
546 self
.topic
.edit(self
.fake_session
, uid
, {"projects": [str(uuid4())]})
547 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.NOT_FOUND
, "Wrong HTTP status code")
548 self
.assertIn("can't find a default role for user '{}'".format(self
.test_name
),
549 norm(str(e
.exception
)), "Wrong exception text")
550 with self
.subTest(i
=3):
551 admin_uid
= str(uuid4())
552 self
.auth
.get_user_list
.return_value
= [{"_id": admin_uid
, "username": "admin"}]
553 with self
.assertRaises(EngineException
, msg
="Accepted removing system_admin role from admin user") as e
:
554 self
.topic
.edit(self
.fake_session
, admin_uid
,
555 {"remove_project_role_mappings": [{"project": "admin", "role": "system_admin"}]})
556 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.FORBIDDEN
, "Wrong HTTP status code")
557 self
.assertIn("you cannot remove system_admin role from admin user",
558 norm(str(e
.exception
)), "Wrong exception text")
559 with self
.subTest(i
=4):
560 new_name
= "new-user-name"
561 self
.auth
.get_user_list
.side_effect
= [[{"_id": uid
, "name": self
.test_name
}],
562 [{"_id": str(uuid4()), "name": new_name
}]]
563 with self
.assertRaises(EngineException
, msg
="Accepted existing username") as e
:
564 self
.topic
.edit(self
.fake_session
, uid
, {"username": new_name
})
565 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.CONFLICT
, "Wrong HTTP status code")
566 self
.assertIn("username '{}' is already used".format(new_name
),
567 norm(str(e
.exception
)), "Wrong exception text")
569 def test_conflict_on_del(self
):
570 with self
.subTest(i
=1):
572 self
.fake_session
["username"] = self
.test_name
573 user
= user
= {"_id": uid
, "username": self
.test_name
, "project_role_mappings": []}
574 self
.auth
.get_user
.return_value
= user
575 with self
.assertRaises(EngineException
, msg
="Accepted deletion of own user") as e
:
576 self
.topic
.delete(self
.fake_session
, uid
)
577 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.CONFLICT
, "Wrong HTTP status code")
578 self
.assertIn("you cannot delete your own login user", norm(str(e
.exception
)), "Wrong exception text")
581 class Test_CommonVimWimSdn(TestCase
):
585 cls
.test_name
= "test-cim-topic" # CIM = Common Infrastructure Manager
588 self
.db
= Mock(dbbase
.DbBase())
589 self
.fs
= Mock(fsbase
.FsBase())
590 self
.msg
= Mock(msgbase
.MsgBase())
591 self
.auth
= Mock(authconn
.Authconn(None, None))
592 self
.topic
= CommonVimWimSdn(self
.db
, self
.fs
, self
.msg
, self
.auth
)
593 # Use WIM schemas for testing because they are the simplest
594 self
.topic
.topic
= "wims"
595 self
.topic
.schema_new
= validation
.wim_account_new_schema
596 self
.topic
.schema_edit
= validation
.wim_account_edit_schema
597 self
.fake_session
= {"username": test_name
, "project_id": (test_pid
,), "method": None,
598 "admin": True, "force": False, "public": False, "allow_show_user_project_role": True}
600 def test_new_cvws(self
):
601 test_url
= "http://0.0.0.0:0"
602 with self
.subTest(i
=1):
605 self
.db
.get_one
.return_value
= None
606 self
.db
.create
.side_effect
= lambda self
, content
: content
["_id"]
607 cid
, oid
= self
.topic
.new(rollback
, self
.fake_session
,
608 {"name": self
.test_name
, "wim_url": test_url
, "wim_type": test_type
})
609 self
.assertEqual(len(rollback
), 1, "Wrong rollback length")
610 args
= self
.db
.create
.call_args
[0]
612 self
.assertEqual(args
[0], self
.topic
.topic
, "Wrong topic")
613 self
.assertEqual(content
["_id"], cid
, "Wrong CIM identifier")
614 self
.assertEqual(content
["name"], self
.test_name
, "Wrong CIM name")
615 self
.assertEqual(content
["wim_url"], test_url
, "Wrong URL")
616 self
.assertEqual(content
["wim_type"], test_type
, "Wrong CIM type")
617 self
.assertEqual(content
["schema_version"], "1.11", "Wrong schema version")
618 self
.assertEqual(content
["op_id"], oid
, "Wrong operation identifier")
619 self
.assertIsNotNone(content
["_admin"]["created"], "Wrong creation time")
620 self
.assertEqual(content
["_admin"]["modified"], content
["_admin"]["created"], "Wrong modification time")
621 self
.assertEqual(content
["_admin"]["operationalState"], "PROCESSING", "Wrong operational state")
622 self
.assertEqual(content
["_admin"]["projects_read"], [test_pid
], "Wrong read-only projects")
623 self
.assertEqual(content
["_admin"]["projects_write"], [test_pid
], "Wrong read/write projects")
624 self
.assertIsNone(content
["_admin"]["current_operation"], "Wrong current operation")
625 self
.assertEqual(len(content
["_admin"]["operations"]), 1, "Wrong number of operations")
626 operation
= content
["_admin"]["operations"][0]
627 self
.assertEqual(operation
["lcmOperationType"], "create", "Wrong operation type")
628 self
.assertEqual(operation
["operationState"], "PROCESSING", "Wrong operation state")
629 self
.assertGreater(operation
["startTime"], content
["_admin"]["created"], "Wrong operation start time")
630 self
.assertGreater(operation
["statusEnteredTime"], content
["_admin"]["created"],
631 "Wrong operation status enter time")
632 self
.assertEqual(operation
["detailed-status"], "", "Wrong operation detailed status info")
633 self
.assertIsNone(operation
["operationParams"], "Wrong operation parameters")
634 # This test is disabled. From Feature 8030 we admit all WIM/SDN types
635 # with self.subTest(i=2):
637 # test_type = "bad_type"
638 # with self.assertRaises(EngineException, msg="Accepted wrong CIM type") as e:
639 # self.topic.new(rollback, self.fake_session,
640 # {"name": self.test_name, "wim_url": test_url, "wim_type": test_type})
641 # self.assertEqual(len(rollback), 0, "Wrong rollback length")
642 # self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
643 # self.assertIn("format error at '{}' '{}".format("wim_type", "'{}' is not one of {}").format(test_type,""),
644 # norm(str(e.exception)), "Wrong exception text")
646 def test_conflict_on_new(self
):
647 with self
.subTest(i
=1):
649 test_url
= "http://0.0.0.0:0"
651 self
.db
.get_one
.return_value
= {"_id": str(uuid4()), "name": self
.test_name
}
652 with self
.assertRaises(EngineException
, msg
="Accepted existing CIM name") as e
:
653 self
.topic
.new(rollback
, self
.fake_session
,
654 {"name": self
.test_name
, "wim_url": test_url
, "wim_type": test_type
})
655 self
.assertEqual(len(rollback
), 0, "Wrong rollback length")
656 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.CONFLICT
, "Wrong HTTP status code")
657 self
.assertIn("name '{}' already exists for {}".format(self
.test_name
, self
.topic
.topic
),
658 norm(str(e
.exception
)), "Wrong exception text")
660 def test_edit_cvws(self
):
663 test_url
= "http://0.0.0.0:0"
665 cvws
= {"_id": cid
, "name": self
.test_name
, "wim_url": test_url
, "wim_type": test_type
,
666 "_admin": {"created": now
, "modified": now
, "operations": [{"lcmOperationType": "create"}]}}
667 with self
.subTest(i
=1):
668 new_name
= "new-cim-name"
669 new_url
= "https://1.1.1.1:1"
671 self
.db
.get_one
.side_effect
= [cvws
, None]
672 self
.db
.replace
.return_value
= {"updated": 1}
673 # self.db.encrypt.side_effect = [b64str(), b64str()]
674 self
.topic
.edit(self
.fake_session
, cid
, {"name": new_name
, "wim_url": new_url
, "wim_type": new_type
})
675 args
= self
.db
.replace
.call_args
[0]
677 self
.assertEqual(args
[0], self
.topic
.topic
, "Wrong topic")
678 self
.assertEqual(args
[1], cid
, "Wrong CIM identifier")
679 self
.assertEqual(content
["_id"], cid
, "Wrong CIM identifier")
680 self
.assertEqual(content
["name"], new_name
, "Wrong CIM name")
681 self
.assertEqual(content
["wim_type"], new_type
, "Wrong CIM type")
682 self
.assertEqual(content
["wim_url"], new_url
, "Wrong URL")
683 self
.assertEqual(content
["_admin"]["created"], now
, "Wrong creation time")
684 self
.assertGreater(content
["_admin"]["modified"], content
["_admin"]["created"], "Wrong modification time")
685 self
.assertEqual(len(content
["_admin"]["operations"]), 2, "Wrong number of operations")
686 operation
= content
["_admin"]["operations"][1]
687 self
.assertEqual(operation
["lcmOperationType"], "edit", "Wrong operation type")
688 self
.assertEqual(operation
["operationState"], "PROCESSING", "Wrong operation state")
689 self
.assertGreater(operation
["startTime"], content
["_admin"]["modified"], "Wrong operation start time")
690 self
.assertGreater(operation
["statusEnteredTime"], content
["_admin"]["modified"],
691 "Wrong operation status enter time")
692 self
.assertEqual(operation
["detailed-status"], "", "Wrong operation detailed status info")
693 self
.assertIsNone(operation
["operationParams"], "Wrong operation parameters")
694 with self
.subTest(i
=2):
695 with self
.assertRaises(EngineException
, msg
="Accepted wrong property") as e
:
696 self
.topic
.edit(self
.fake_session
, str(uuid4()), {"name": "new-name", "extra_prop": "anything"})
697 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.UNPROCESSABLE_ENTITY
, "Wrong HTTP status code")
698 self
.assertIn("format error '{}'".format("additional properties are not allowed ('{}' was unexpected)").
699 format("extra_prop"),
700 norm(str(e
.exception
)), "Wrong exception text")
702 def test_conflict_on_edit(self
):
703 with self
.subTest(i
=1):
705 new_name
= "new-cim-name"
706 self
.db
.get_one
.side_effect
= [{"_id": cid
, "name": self
.test_name
},
707 {"_id": str(uuid4()), "name": new_name
}]
708 with self
.assertRaises(EngineException
, msg
="Accepted existing CIM name") as e
:
709 self
.topic
.edit(self
.fake_session
, cid
, {"name": new_name
})
710 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.CONFLICT
, "Wrong HTTP status code")
711 self
.assertIn("name '{}' already exists for {}".format(new_name
, self
.topic
.topic
),
712 norm(str(e
.exception
)), "Wrong exception text")
714 def test_delete_cvws(self
):
716 ro_pid
= str(uuid4())
717 rw_pid
= str(uuid4())
718 cvws
= {"_id": cid
, "name": self
.test_name
}
719 self
.db
.get_list
.return_value
= []
720 with self
.subTest(i
=1):
721 cvws
["_admin"] = {"projects_read": [test_pid
, ro_pid
, rw_pid
], "projects_write": [test_pid
, rw_pid
]}
722 self
.db
.get_one
.return_value
= cvws
723 oid
= self
.topic
.delete(self
.fake_session
, cid
)
724 self
.assertIsNone(oid
, "Wrong operation identifier")
725 self
.assertEqual(self
.db
.get_one
.call_args
[0][0], self
.topic
.topic
, "Wrong topic")
726 self
.assertEqual(self
.db
.get_one
.call_args
[0][1]["_id"], cid
, "Wrong CIM identifier")
727 self
.assertEqual(self
.db
.set_one
.call_args
[0][0], self
.topic
.topic
, "Wrong topic")
728 self
.assertEqual(self
.db
.set_one
.call_args
[0][1]["_id"], cid
, "Wrong CIM identifier")
729 self
.assertEqual(self
.db
.set_one
.call_args
[1]["update_dict"]["_admin.projects_read"], [ro_pid
, rw_pid
],
730 "Wrong read-only projects update")
731 self
.assertEqual(self
.db
.set_one
.call_args
[1]["update_dict"]["_admin.projects_write"], [rw_pid
],
732 "Wrong read/write projects update")
733 with self
.subTest(i
=2):
735 cvws
["_admin"] = {"projects_read": [test_pid
], "projects_write": [test_pid
], "operations": []}
736 self
.db
.get_one
.return_value
= cvws
737 oid
= self
.topic
.delete(self
.fake_session
, cid
)
738 self
.assertEqual(oid
, cid
+":0", "Wrong operation identifier")
739 self
.assertEqual(self
.db
.get_one
.call_args
[0][0], self
.topic
.topic
, "Wrong topic")
740 self
.assertEqual(self
.db
.get_one
.call_args
[0][1]["_id"], cid
, "Wrong CIM identifier")
741 self
.assertEqual(self
.db
.set_one
.call_args
[0][0], self
.topic
.topic
, "Wrong topic")
742 self
.assertEqual(self
.db
.set_one
.call_args
[0][1]["_id"], cid
, "Wrong user identifier")
743 update_dict
= self
.db
.set_one
.call_args
[1]["update_dict"]
744 self
.assertEqual(update_dict
["_admin.projects_read"], [], "Wrong read-only projects update")
745 self
.assertEqual(update_dict
["_admin.projects_write"], [], "Wrong read/write projects update")
746 self
.assertEqual(update_dict
["_admin.to_delete"], True, "Wrong deletion mark")
747 operation
= self
.db
.set_one
.call_args
[1]["push"]["_admin.operations"]
748 self
.assertEqual(operation
["lcmOperationType"], "delete", "Wrong operation type")
749 self
.assertEqual(operation
["operationState"], "PROCESSING", "Wrong operation state")
750 self
.assertEqual(operation
["detailed-status"], "", "Wrong operation detailed status")
751 self
.assertIsNone(operation
["operationParams"], "Wrong operation parameters")
752 self
.assertGreater(operation
["startTime"], now
, "Wrong operation start time")
753 self
.assertGreater(operation
["statusEnteredTime"], now
, "Wrong operation status enter time")
754 with self
.subTest(i
=3):
755 cvws
["_admin"] = {"projects_read": [], "projects_write": [], "operations": []}
756 self
.db
.get_one
.return_value
= cvws
757 self
.fake_session
["force"] = True # to force deletion
758 oid
= self
.topic
.delete(self
.fake_session
, cid
)
759 self
.assertIsNone(oid
, "Wrong operation identifier")
760 self
.assertEqual(self
.db
.get_one
.call_args
[0][0], self
.topic
.topic
, "Wrong topic")
761 self
.assertEqual(self
.db
.get_one
.call_args
[0][1]["_id"], cid
, "Wrong CIM identifier")
762 self
.assertEqual(self
.db
.del_one
.call_args
[0][0], self
.topic
.topic
, "Wrong topic")
763 self
.assertEqual(self
.db
.del_one
.call_args
[0][1]["_id"], cid
, "Wrong CIM identifier")
766 if __name__
== '__main__':