2 # -*- coding: utf-8 -*-
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
17 __author__
= "Pedro de la Cruz Ramos, pedro.delacruzramos@altran.com"
18 __date__
= "$2019-10-019"
21 from unittest
import TestCase
22 from unittest
.mock
import Mock
, patch
, call
23 from uuid
import uuid4
24 from http
import HTTPStatus
26 from random
import randint
27 from osm_common
import dbbase
, fsbase
, msgbase
28 from osm_nbi
import authconn
, validation
29 from osm_nbi
.admin_topics
import (
36 from osm_nbi
.engine
import EngineException
37 from osm_nbi
.authconn
import AuthconnNotFoundException
40 test_pid
= str(uuid4())
41 test_name
= "test-user"
45 """Normalize string for checking"""
46 return ' '.join(str.strip().split()).lower()
49 class TestVcaTopic(TestCase
):
51 self
.db
= Mock(dbbase
.DbBase())
52 self
.fs
= Mock(fsbase
.FsBase())
53 self
.msg
= Mock(msgbase
.MsgBase())
54 self
.auth
= Mock(authconn
.Authconn(None, None, None))
55 self
.vca_topic
= VcaTopic(self
.db
, self
.fs
, self
.msg
, self
.auth
)
57 @patch("osm_nbi.admin_topics.CommonVimWimSdn.format_on_new")
58 def test_format_on_new(self
, mock_super_format_on_new
):
61 "secret": "encrypted_secret",
62 "cacert": "encrypted_cacert",
64 self
.db
.encrypt
.side_effect
= ["secret", "cacert"]
65 mock_super_format_on_new
.return_value
= "1234"
67 oid
= self
.vca_topic
.format_on_new(content
)
69 self
.assertEqual(oid
, "1234")
70 self
.assertEqual(content
["secret"], "secret")
71 self
.assertEqual(content
["cacert"], "cacert")
72 self
.db
.encrypt
.assert_has_calls(
74 call("encrypted_secret", schema_version
="1.11", salt
="id"),
75 call("encrypted_cacert", schema_version
="1.11", salt
="id"),
78 mock_super_format_on_new
.assert_called_with(content
, None, False)
80 @patch("osm_nbi.admin_topics.CommonVimWimSdn.format_on_edit")
81 def test_format_on_edit(self
, mock_super_format_on_edit
):
84 "secret": "encrypted_secret",
85 "cacert": "encrypted_cacert",
89 "schema_version": "1.11",
91 self
.db
.encrypt
.side_effect
= ["secret", "cacert"]
92 mock_super_format_on_edit
.return_value
= "1234"
94 oid
= self
.vca_topic
.format_on_edit(final_content
, edit_content
)
96 self
.assertEqual(oid
, "1234")
97 self
.assertEqual(final_content
["secret"], "secret")
98 self
.assertEqual(final_content
["cacert"], "cacert")
99 self
.db
.encrypt
.assert_has_calls(
101 call("encrypted_secret", schema_version
="1.11", salt
="id"),
102 call("encrypted_cacert", schema_version
="1.11", salt
="id"),
105 mock_super_format_on_edit
.assert_called()
107 @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
108 def test_check_conflict_on_del(self
, mock_check_conflict_on_del
):
110 "project_id": "project-id",
116 self
.db
.get_list
.return_value
= None
118 self
.vca_topic
.check_conflict_on_del(session
, _id
, db_content
)
120 self
.db
.get_list
.assert_called_with(
122 {"vca": _id
, '_admin.projects_read.cont': 'project-id'},
124 mock_check_conflict_on_del
.assert_called_with(session
, _id
, db_content
)
126 @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
127 def test_check_conflict_on_del_force(self
, mock_check_conflict_on_del
):
129 "project_id": "project-id",
135 self
.vca_topic
.check_conflict_on_del(session
, _id
, db_content
)
137 self
.db
.get_list
.assert_not_called()
138 mock_check_conflict_on_del
.assert_not_called()
140 @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
141 def test_check_conflict_on_del_with_conflict(self
, mock_check_conflict_on_del
):
143 "project_id": "project-id",
149 self
.db
.get_list
.return_value
= {"_id": "vim", "vca": "vca-id"}
151 with self
.assertRaises(EngineException
) as context
:
152 self
.vca_topic
.check_conflict_on_del(session
, _id
, db_content
)
156 "There is at least one VIM account using this vca",
157 http_code
=HTTPStatus
.CONFLICT
161 self
.db
.get_list
.assert_called_with(
163 {"vca": _id
, '_admin.projects_read.cont': 'project-id'},
165 mock_check_conflict_on_del
.assert_not_called()
168 class Test_ProjectTopicAuth(TestCase
):
172 cls
.test_name
= "test-project-topic"
175 self
.db
= Mock(dbbase
.DbBase())
176 self
.fs
= Mock(fsbase
.FsBase())
177 self
.msg
= Mock(msgbase
.MsgBase())
178 self
.auth
= Mock(authconn
.Authconn(None, None, None))
179 self
.topic
= ProjectTopicAuth(self
.db
, self
.fs
, self
.msg
, self
.auth
)
180 self
.fake_session
= {"username": self
.test_name
, "project_id": (test_pid
,), "method": None,
181 "admin": True, "force": False, "public": False, "allow_show_user_project_role": True}
182 self
.topic
.check_quota
= Mock(return_value
=None) # skip quota
184 def test_new_project(self
):
185 with self
.subTest(i
=1):
188 self
.auth
.get_project_list
.return_value
= []
189 self
.auth
.create_project
.return_value
= pid1
190 pid2
, oid
= self
.topic
.new(rollback
, self
.fake_session
, {"name": self
.test_name
, "quotas": {}})
191 self
.assertEqual(len(rollback
), 1, "Wrong rollback length")
192 self
.assertEqual(pid2
, pid1
, "Wrong project identifier")
193 content
= self
.auth
.create_project
.call_args
[0][0]
194 self
.assertEqual(content
["name"], self
.test_name
, "Wrong project name")
195 self
.assertEqual(content
["quotas"], {}, "Wrong quotas")
196 self
.assertIsNotNone(content
["_admin"]["created"], "Wrong creation time")
197 self
.assertEqual(content
["_admin"]["modified"], content
["_admin"]["created"], "Wrong modification time")
198 with self
.subTest(i
=2):
200 with self
.assertRaises(EngineException
, msg
="Accepted wrong quotas") as e
:
201 self
.topic
.new(rollback
, self
.fake_session
, {"name": "other-project-name", "quotas": {"baditems": 10}})
202 self
.assertEqual(len(rollback
), 0, "Wrong rollback length")
203 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.UNPROCESSABLE_ENTITY
, "Wrong HTTP status code")
204 self
.assertIn("format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'"
205 .format("baditems"), norm(str(e
.exception
)), "Wrong exception text")
207 def test_edit_project(self
):
210 proj
= {"_id": pid
, "name": self
.test_name
, "_admin": {"created": now
, "modified": now
}}
211 with self
.subTest(i
=1):
212 self
.auth
.get_project_list
.side_effect
= [[proj
], []]
213 new_name
= "new-project-name"
214 quotas
= {"vnfds": randint(0, 100), "nsds": randint(0, 100)}
215 self
.topic
.edit(self
.fake_session
, pid
, {"name": new_name
, "quotas": quotas
})
216 _id
, content
= self
.auth
.update_project
.call_args
[0]
217 self
.assertEqual(_id
, pid
, "Wrong project identifier")
218 self
.assertEqual(content
["_id"], pid
, "Wrong project identifier")
219 self
.assertEqual(content
["_admin"]["created"], now
, "Wrong creation time")
220 self
.assertGreater(content
["_admin"]["modified"], now
, "Wrong modification time")
221 self
.assertEqual(content
["name"], new_name
, "Wrong project name")
222 self
.assertEqual(content
["quotas"], quotas
, "Wrong quotas")
223 with self
.subTest(i
=2):
224 new_name
= "other-project-name"
225 quotas
= {"baditems": randint(0, 100)}
226 self
.auth
.get_project_list
.side_effect
= [[proj
], []]
227 with self
.assertRaises(EngineException
, msg
="Accepted wrong quotas") as e
:
228 self
.topic
.edit(self
.fake_session
, pid
, {"name": new_name
, "quotas": quotas
})
229 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.UNPROCESSABLE_ENTITY
, "Wrong HTTP status code")
230 self
.assertIn("format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'"
231 .format("baditems"), norm(str(e
.exception
)), "Wrong exception text")
233 def test_conflict_on_new(self
):
234 with self
.subTest(i
=1):
237 with self
.assertRaises(EngineException
, msg
="Accepted uuid as project name") as e
:
238 self
.topic
.new(rollback
, self
.fake_session
, {"name": pid
})
239 self
.assertEqual(len(rollback
), 0, "Wrong rollback length")
240 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.UNPROCESSABLE_ENTITY
, "Wrong HTTP status code")
241 self
.assertIn("project name '{}' cannot have an uuid format".format(pid
),
242 norm(str(e
.exception
)), "Wrong exception text")
243 with self
.subTest(i
=2):
245 self
.auth
.get_project_list
.return_value
= [{"_id": test_pid
, "name": self
.test_name
}]
246 with self
.assertRaises(EngineException
, msg
="Accepted existing project name") as e
:
247 self
.topic
.new(rollback
, self
.fake_session
, {"name": self
.test_name
})
248 self
.assertEqual(len(rollback
), 0, "Wrong rollback length")
249 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.CONFLICT
, "Wrong HTTP status code")
250 self
.assertIn("project '{}' exists".format(self
.test_name
),
251 norm(str(e
.exception
)), "Wrong exception text")
253 def test_conflict_on_edit(self
):
254 with self
.subTest(i
=1):
255 self
.auth
.get_project_list
.return_value
= [{"_id": test_pid
, "name": self
.test_name
}]
256 new_name
= str(uuid4())
257 with self
.assertRaises(EngineException
, msg
="Accepted uuid as project name") as e
:
258 self
.topic
.edit(self
.fake_session
, test_pid
, {"name": new_name
})
259 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.UNPROCESSABLE_ENTITY
, "Wrong HTTP status code")
260 self
.assertIn("project name '{}' cannot have an uuid format".format(new_name
),
261 norm(str(e
.exception
)), "Wrong exception text")
262 with self
.subTest(i
=2):
264 self
.auth
.get_project_list
.return_value
= [{"_id": pid
, "name": "admin"}]
265 with self
.assertRaises(EngineException
, msg
="Accepted renaming of project 'admin'") as e
:
266 self
.topic
.edit(self
.fake_session
, pid
, {"name": "new-name"})
267 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.CONFLICT
, "Wrong HTTP status code")
268 self
.assertIn("you cannot rename project 'admin'",
269 norm(str(e
.exception
)), "Wrong exception text")
270 with self
.subTest(i
=3):
271 new_name
= "new-project-name"
272 self
.auth
.get_project_list
.side_effect
= [[{"_id": test_pid
, "name": self
.test_name
}],
273 [{"_id": str(uuid4()), "name": new_name
}]]
274 with self
.assertRaises(EngineException
, msg
="Accepted existing project name") as e
:
275 self
.topic
.edit(self
.fake_session
, pid
, {"name": new_name
})
276 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.CONFLICT
, "Wrong HTTP status code")
277 self
.assertIn("project '{}' is already used".format(new_name
),
278 norm(str(e
.exception
)), "Wrong exception text")
280 def test_delete_project(self
):
281 with self
.subTest(i
=1):
283 self
.auth
.get_project
.return_value
= {"_id": pid
, "name": "other-project-name"}
284 self
.auth
.delete_project
.return_value
= {"deleted": 1}
285 self
.auth
.get_user_list
.return_value
= []
286 self
.db
.get_list
.return_value
= []
287 rc
= self
.topic
.delete(self
.fake_session
, pid
)
288 self
.assertEqual(rc
, {"deleted": 1}, "Wrong project deletion return info")
289 self
.assertEqual(self
.auth
.get_project
.call_args
[0][0], pid
, "Wrong project identifier")
290 self
.assertEqual(self
.auth
.delete_project
.call_args
[0][0], pid
, "Wrong project identifier")
292 def test_conflict_on_del(self
):
293 with self
.subTest(i
=1):
294 self
.auth
.get_project
.return_value
= {"_id": test_pid
, "name": self
.test_name
}
295 with self
.assertRaises(EngineException
, msg
="Accepted deletion of own project") as e
:
296 self
.topic
.delete(self
.fake_session
, self
.test_name
)
297 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.CONFLICT
, "Wrong HTTP status code")
298 self
.assertIn("you cannot delete your own project", norm(str(e
.exception
)), "Wrong exception text")
299 with self
.subTest(i
=2):
300 self
.auth
.get_project
.return_value
= {"_id": str(uuid4()), "name": "admin"}
301 with self
.assertRaises(EngineException
, msg
="Accepted deletion of project 'admin'") as e
:
302 self
.topic
.delete(self
.fake_session
, "admin")
303 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.CONFLICT
, "Wrong HTTP status code")
304 self
.assertIn("you cannot delete project 'admin'", norm(str(e
.exception
)), "Wrong exception text")
305 with self
.subTest(i
=3):
307 name
= "other-project-name"
308 self
.auth
.get_project
.return_value
= {"_id": pid
, "name": name
}
309 self
.auth
.get_user_list
.return_value
= [{"_id": str(uuid4()), "username": self
.test_name
,
310 "project_role_mappings": [{"project": pid
, "role": str(uuid4())}]}]
311 with self
.assertRaises(EngineException
, msg
="Accepted deletion of used project") as e
:
312 self
.topic
.delete(self
.fake_session
, pid
)
313 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.CONFLICT
, "Wrong HTTP status code")
314 self
.assertIn("project '{}' ({}) is being used by user '{}'".format(name
, pid
, self
.test_name
),
315 norm(str(e
.exception
)), "Wrong exception text")
316 with self
.subTest(i
=4):
317 self
.auth
.get_user_list
.return_value
= []
318 self
.db
.get_list
.return_value
= [{"_id": str(uuid4()), "id": self
.test_name
,
319 "_admin": {"projects_read": [pid
], "projects_write": []}}]
320 with self
.assertRaises(EngineException
, msg
="Accepted deletion of used project") as e
:
321 self
.topic
.delete(self
.fake_session
, pid
)
322 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.CONFLICT
, "Wrong HTTP status code")
323 self
.assertIn("project '{}' ({}) is being used by {} '{}'"
324 .format(name
, pid
, "vnf descriptor", self
.test_name
),
325 norm(str(e
.exception
)), "Wrong exception text")
328 class Test_RoleTopicAuth(TestCase
):
332 cls
.test_name
= "test-role-topic"
333 cls
.test_operations
= ["tokens:get"]
336 self
.db
= Mock(dbbase
.DbBase())
337 self
.fs
= Mock(fsbase
.FsBase())
338 self
.msg
= Mock(msgbase
.MsgBase())
339 self
.auth
= Mock(authconn
.Authconn(None, None, None))
340 self
.auth
.role_permissions
= self
.test_operations
341 self
.topic
= RoleTopicAuth(self
.db
, self
.fs
, self
.msg
, self
.auth
)
342 self
.fake_session
= {"username": test_name
, "project_id": (test_pid
,), "method": None,
343 "admin": True, "force": False, "public": False, "allow_show_user_project_role": True}
344 self
.topic
.check_quota
= Mock(return_value
=None) # skip quota
346 def test_new_role(self
):
347 with self
.subTest(i
=1):
350 perms_in
= {"tokens": True}
351 perms_out
= {"default": False, "admin": False, "tokens": True}
352 self
.auth
.get_role_list
.return_value
= []
353 self
.auth
.create_role
.return_value
= rid1
354 rid2
, oid
= self
.topic
.new(rollback
, self
.fake_session
, {"name": self
.test_name
, "permissions": perms_in
})
355 self
.assertEqual(len(rollback
), 1, "Wrong rollback length")
356 self
.assertEqual(rid2
, rid1
, "Wrong project identifier")
357 content
= self
.auth
.create_role
.call_args
[0][0]
358 self
.assertEqual(content
["name"], self
.test_name
, "Wrong role name")
359 self
.assertEqual(content
["permissions"], perms_out
, "Wrong permissions")
360 self
.assertIsNotNone(content
["_admin"]["created"], "Wrong creation time")
361 self
.assertEqual(content
["_admin"]["modified"], content
["_admin"]["created"], "Wrong modification time")
362 with self
.subTest(i
=2):
364 with self
.assertRaises(EngineException
, msg
="Accepted wrong permissions") as e
:
365 self
.topic
.new(rollback
, self
.fake_session
,
366 {"name": "other-role-name", "permissions": {"projects": True}})
367 self
.assertEqual(len(rollback
), 0, "Wrong rollback length")
368 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.UNPROCESSABLE_ENTITY
, "Wrong HTTP status code")
369 self
.assertIn("invalid permission '{}'".format("projects"),
370 norm(str(e
.exception
)), "Wrong exception text")
372 def test_edit_role(self
):
375 role
= {"_id": rid
, "name": self
.test_name
, "permissions": {"tokens": True},
376 "_admin": {"created": now
, "modified": now
}}
377 with self
.subTest(i
=1):
378 self
.auth
.get_role_list
.side_effect
= [[role
], []]
379 self
.auth
.get_role
.return_value
= role
380 new_name
= "new-role-name"
381 perms_in
= {"tokens": False, "tokens:get": True}
382 perms_out
= {"default": False, "admin": False, "tokens": False, "tokens:get": True}
383 self
.topic
.edit(self
.fake_session
, rid
, {"name": new_name
, "permissions": perms_in
})
384 content
= self
.auth
.update_role
.call_args
[0][0]
385 self
.assertEqual(content
["_id"], rid
, "Wrong role identifier")
386 self
.assertEqual(content
["_admin"]["created"], now
, "Wrong creation time")
387 self
.assertGreater(content
["_admin"]["modified"], now
, "Wrong modification time")
388 self
.assertEqual(content
["name"], new_name
, "Wrong role name")
389 self
.assertEqual(content
["permissions"], perms_out
, "Wrong permissions")
390 with self
.subTest(i
=2):
391 new_name
= "other-role-name"
392 perms_in
= {"tokens": False, "tokens:post": True}
393 self
.auth
.get_role_list
.side_effect
= [[role
], []]
394 with self
.assertRaises(EngineException
, msg
="Accepted wrong permissions") as e
:
395 self
.topic
.edit(self
.fake_session
, rid
, {"name": new_name
, "permissions": perms_in
})
396 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.UNPROCESSABLE_ENTITY
, "Wrong HTTP status code")
397 self
.assertIn("invalid permission '{}'".format("tokens:post"),
398 norm(str(e
.exception
)), "Wrong exception text")
400 def test_delete_role(self
):
401 with self
.subTest(i
=1):
403 role
= {"_id": rid
, "name": "other-role-name"}
404 self
.auth
.get_role_list
.return_value
= [role
]
405 self
.auth
.get_role
.return_value
= role
406 self
.auth
.delete_role
.return_value
= {"deleted": 1}
407 self
.auth
.get_user_list
.return_value
= []
408 rc
= self
.topic
.delete(self
.fake_session
, rid
)
409 self
.assertEqual(rc
, {"deleted": 1}, "Wrong role deletion return info")
410 self
.assertEqual(self
.auth
.get_role_list
.call_args
[0][0]["_id"], rid
, "Wrong role identifier")
411 self
.assertEqual(self
.auth
.get_role
.call_args
[0][0], rid
, "Wrong role identifier")
412 self
.assertEqual(self
.auth
.delete_role
.call_args
[0][0], rid
, "Wrong role identifier")
414 def test_conflict_on_new(self
):
415 with self
.subTest(i
=1):
418 with self
.assertRaises(EngineException
, msg
="Accepted uuid as role name") as e
:
419 self
.topic
.new(rollback
, self
.fake_session
, {"name": rid
})
420 self
.assertEqual(len(rollback
), 0, "Wrong rollback length")
421 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.UNPROCESSABLE_ENTITY
, "Wrong HTTP status code")
422 self
.assertIn("role name '{}' cannot have an uuid format".format(rid
),
423 norm(str(e
.exception
)), "Wrong exception text")
424 with self
.subTest(i
=2):
426 self
.auth
.get_role_list
.return_value
= [{"_id": str(uuid4()), "name": self
.test_name
}]
427 with self
.assertRaises(EngineException
, msg
="Accepted existing role name") as e
:
428 self
.topic
.new(rollback
, self
.fake_session
, {"name": self
.test_name
})
429 self
.assertEqual(len(rollback
), 0, "Wrong rollback length")
430 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.CONFLICT
, "Wrong HTTP status code")
431 self
.assertIn("role name '{}' exists".format(self
.test_name
),
432 norm(str(e
.exception
)), "Wrong exception text")
434 def test_conflict_on_edit(self
):
436 with self
.subTest(i
=1):
437 self
.auth
.get_role_list
.return_value
= [{"_id": rid
, "name": self
.test_name
, "permissions": {}}]
438 new_name
= str(uuid4())
439 with self
.assertRaises(EngineException
, msg
="Accepted uuid as role name") as e
:
440 self
.topic
.edit(self
.fake_session
, rid
, {"name": new_name
})
441 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.UNPROCESSABLE_ENTITY
, "Wrong HTTP status code")
442 self
.assertIn("role name '{}' cannot have an uuid format".format(new_name
),
443 norm(str(e
.exception
)), "Wrong exception text")
444 for i
, role_name
in enumerate(["system_admin", "project_admin"], start
=2):
445 with self
.subTest(i
=i
):
447 self
.auth
.get_role
.return_value
= {"_id": rid
, "name": role_name
, "permissions": {}}
448 with self
.assertRaises(EngineException
, msg
="Accepted renaming of role '{}'".format(role_name
)) as e
:
449 self
.topic
.edit(self
.fake_session
, rid
, {"name": "new-name"})
450 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.FORBIDDEN
, "Wrong HTTP status code")
451 self
.assertIn("you cannot rename role '{}'".format(role_name
),
452 norm(str(e
.exception
)), "Wrong exception text")
453 with self
.subTest(i
=i
+1):
454 new_name
= "new-role-name"
455 self
.auth
.get_role_list
.side_effect
= [[{"_id": rid
, "name": self
.test_name
, "permissions": {}}],
456 [{"_id": str(uuid4()), "name": new_name
, "permissions": {}}]]
457 self
.auth
.get_role
.return_value
= {"_id": rid
, "name": self
.test_name
, "permissions": {}}
458 with self
.assertRaises(EngineException
, msg
="Accepted existing role name") as e
:
459 self
.topic
.edit(self
.fake_session
, rid
, {"name": new_name
})
460 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.CONFLICT
, "Wrong HTTP status code")
461 self
.assertIn("role name '{}' exists".format(new_name
),
462 norm(str(e
.exception
)), "Wrong exception text")
464 def test_conflict_on_del(self
):
465 for i
, role_name
in enumerate(["system_admin", "project_admin"], start
=1):
466 with self
.subTest(i
=i
):
468 role
= {"_id": rid
, "name": role_name
}
469 self
.auth
.get_role_list
.return_value
= [role
]
470 self
.auth
.get_role
.return_value
= role
471 with self
.assertRaises(EngineException
, msg
="Accepted deletion of role '{}'".format(role_name
)) as e
:
472 self
.topic
.delete(self
.fake_session
, rid
)
473 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.FORBIDDEN
, "Wrong HTTP status code")
474 self
.assertIn("you cannot delete role '{}'".format(role_name
),
475 norm(str(e
.exception
)), "Wrong exception text")
476 with self
.subTest(i
=i
+1):
478 name
= "other-role-name"
479 role
= {"_id": rid
, "name": name
}
480 self
.auth
.get_role_list
.return_value
= [role
]
481 self
.auth
.get_role
.return_value
= role
482 self
.auth
.get_user_list
.return_value
= [{"_id": str(uuid4()), "username": self
.test_name
,
483 "project_role_mappings": [{"project": str(uuid4()), "role": rid
}]}]
484 with self
.assertRaises(EngineException
, msg
="Accepted deletion of used role") as e
:
485 self
.topic
.delete(self
.fake_session
, rid
)
486 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.CONFLICT
, "Wrong HTTP status code")
487 self
.assertIn("role '{}' ({}) is being used by user '{}'".format(name
, rid
, self
.test_name
),
488 norm(str(e
.exception
)), "Wrong exception text")
491 class Test_UserTopicAuth(TestCase
):
495 cls
.test_name
= "test-user-topic"
498 self
.db
= Mock(dbbase
.DbBase())
499 self
.fs
= Mock(fsbase
.FsBase())
500 self
.msg
= Mock(msgbase
.MsgBase())
501 self
.auth
= Mock(authconn
.Authconn(None, None, None))
502 self
.topic
= UserTopicAuth(self
.db
, self
.fs
, self
.msg
, self
.auth
)
503 self
.fake_session
= {"username": test_name
, "project_id": (test_pid
,), "method": None,
504 "admin": True, "force": False, "public": False, "allow_show_user_project_role": True}
505 self
.topic
.check_quota
= Mock(return_value
=None) # skip quota
507 def test_new_user(self
):
510 self
.auth
.get_user_list
.return_value
= []
511 self
.auth
.get_project
.return_value
= {"_id": pid
, "name": "some_project"}
512 self
.auth
.create_user
.return_value
= {"_id": uid1
, "username": self
.test_name
}
513 with self
.subTest(i
=1):
516 self
.auth
.get_role
.return_value
= {"_id": rid
, "name": "some_role"}
517 prms_in
= [{"project": "some_project", "role": "some_role"}]
518 prms_out
= [{"project": pid
, "role": rid
}]
519 uid2
, oid
= self
.topic
.new(rollback
, self
.fake_session
, {"username": self
.test_name
,
520 "password": self
.test_name
,
521 "project_role_mappings": prms_in
523 self
.assertEqual(len(rollback
), 1, "Wrong rollback length")
524 self
.assertEqual(uid2
, uid1
, "Wrong project identifier")
525 content
= self
.auth
.create_user
.call_args
[0][0]
526 self
.assertEqual(content
["username"], self
.test_name
, "Wrong project name")
527 self
.assertEqual(content
["password"], self
.test_name
, "Wrong password")
528 self
.assertEqual(content
["project_role_mappings"], prms_out
, "Wrong project-role mappings")
529 self
.assertIsNotNone(content
["_admin"]["created"], "Wrong creation time")
530 self
.assertEqual(content
["_admin"]["modified"], content
["_admin"]["created"], "Wrong modification time")
531 with self
.subTest(i
=2):
533 def_rid
= str(uuid4())
534 def_role
= {"_id": def_rid
, "name": "project_admin"}
535 self
.auth
.get_role
.return_value
= def_role
536 self
.auth
.get_role_list
.return_value
= [def_role
]
537 prms_out
= [{"project": pid
, "role": def_rid
}]
538 uid2
, oid
= self
.topic
.new(rollback
, self
.fake_session
, {"username": self
.test_name
,
539 "password": self
.test_name
,
540 "projects": ["some_project"]
542 self
.assertEqual(len(rollback
), 1, "Wrong rollback length")
543 self
.assertEqual(uid2
, uid1
, "Wrong project identifier")
544 content
= self
.auth
.create_user
.call_args
[0][0]
545 self
.assertEqual(content
["username"], self
.test_name
, "Wrong project name")
546 self
.assertEqual(content
["password"], self
.test_name
, "Wrong password")
547 self
.assertEqual(content
["project_role_mappings"], prms_out
, "Wrong project-role mappings")
548 self
.assertIsNotNone(content
["_admin"]["created"], "Wrong creation time")
549 self
.assertEqual(content
["_admin"]["modified"], content
["_admin"]["created"], "Wrong modification time")
550 with self
.subTest(i
=3):
552 with self
.assertRaises(EngineException
, msg
="Accepted wrong project-role mappings") as e
:
553 self
.topic
.new(rollback
, self
.fake_session
, {"username": "other-project-name",
554 "password": "other-password",
555 "project_role_mappings": [{}]
557 self
.assertEqual(len(rollback
), 0, "Wrong rollback length")
558 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.UNPROCESSABLE_ENTITY
, "Wrong HTTP status code")
559 self
.assertIn("format error at '{}' '{}'"
560 .format("project_role_mappings:{}", "'{}' is a required property").format(0, "project"),
561 norm(str(e
.exception
)), "Wrong exception text")
562 with self
.subTest(i
=4):
564 with self
.assertRaises(EngineException
, msg
="Accepted wrong projects") as e
:
565 self
.topic
.new(rollback
, self
.fake_session
, {"username": "other-project-name",
566 "password": "other-password",
569 self
.assertEqual(len(rollback
), 0, "Wrong rollback length")
570 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.UNPROCESSABLE_ENTITY
, "Wrong HTTP status code")
571 self
.assertIn("format error at '{}' '{}'" .format("projects", "{} is too short").format([]),
572 norm(str(e
.exception
)), "Wrong exception text")
574 def test_edit_user(self
):
579 prms
= [{"project": pid1
, "project_name": "project-1", "role": rid1
, "role_name": "role-1"}]
580 user
= {"_id": uid
, "username": self
.test_name
, "project_role_mappings": prms
,
581 "_admin": {"created": now
, "modified": now
}}
582 with self
.subTest(i
=1):
583 self
.auth
.get_user_list
.side_effect
= [[user
], []]
584 self
.auth
.get_user
.return_value
= user
587 self
.auth
.get_project
.side_effect
= [{"_id": pid2
, "name": "project-2"},
588 {"_id": pid1
, "name": "project-1"}]
589 self
.auth
.get_role
.side_effect
= [{"_id": rid2
, "name": "role-2"},
590 {"_id": rid1
, "name": "role-1"}]
591 new_name
= "new-user-name"
592 new_pasw
= "new-password"
593 add_prms
= [{"project": pid2
, "role": rid2
}]
594 rem_prms
= [{"project": pid1
, "role": rid1
}]
595 self
.topic
.edit(self
.fake_session
, uid
, {"username": new_name
, "password": new_pasw
,
596 "add_project_role_mappings": add_prms
,
597 "remove_project_role_mappings": rem_prms
599 content
= self
.auth
.update_user
.call_args
[0][0]
600 self
.assertEqual(content
["_id"], uid
, "Wrong user identifier")
601 self
.assertEqual(content
["username"], new_name
, "Wrong user name")
602 self
.assertEqual(content
["password"], new_pasw
, "Wrong user password")
603 self
.assertEqual(content
["add_project_role_mappings"], add_prms
, "Wrong project-role mappings to add")
604 self
.assertEqual(content
["remove_project_role_mappings"], prms
, "Wrong project-role mappings to remove")
605 with self
.subTest(i
=2):
606 new_name
= "other-user-name"
608 self
.auth
.get_role_list
.side_effect
= [[user
], []]
609 self
.auth
.get_user_list
.side_effect
= [[user
]]
610 with self
.assertRaises(EngineException
, msg
="Accepted wrong project-role mappings") as e
:
611 self
.topic
.edit(self
.fake_session
, uid
, {"username": new_name
, "project_role_mappings": new_prms
})
612 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.UNPROCESSABLE_ENTITY
, "Wrong HTTP status code")
613 self
.assertIn("format error at '{}' '{}'"
614 .format("project_role_mappings:{}", "'{}' is a required property").format(0, "project"),
615 norm(str(e
.exception
)), "Wrong exception text")
617 def test_delete_user(self
):
618 with self
.subTest(i
=1):
620 self
.fake_session
["username"] = self
.test_name
621 user
= user
= {"_id": uid
, "username": "other-user-name", "project_role_mappings": []}
622 self
.auth
.get_user
.return_value
= user
623 self
.auth
.delete_user
.return_value
= {"deleted": 1}
624 rc
= self
.topic
.delete(self
.fake_session
, uid
)
625 self
.assertEqual(rc
, {"deleted": 1}, "Wrong user deletion return info")
626 self
.assertEqual(self
.auth
.get_user
.call_args
[0][0], uid
, "Wrong user identifier")
627 self
.assertEqual(self
.auth
.delete_user
.call_args
[0][0], uid
, "Wrong user identifier")
629 def test_conflict_on_new(self
):
630 with self
.subTest(i
=1):
633 with self
.assertRaises(EngineException
, msg
="Accepted uuid as username") as e
:
634 self
.topic
.new(rollback
, self
.fake_session
, {"username": uid
, "password": self
.test_name
,
635 "projects": [test_pid
]})
636 self
.assertEqual(len(rollback
), 0, "Wrong rollback length")
637 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.UNPROCESSABLE_ENTITY
, "Wrong HTTP status code")
638 self
.assertIn("username '{}' cannot have a uuid format".format(uid
),
639 norm(str(e
.exception
)), "Wrong exception text")
640 with self
.subTest(i
=2):
642 self
.auth
.get_user_list
.return_value
= [{"_id": str(uuid4()), "username": self
.test_name
}]
643 with self
.assertRaises(EngineException
, msg
="Accepted existing username") as e
:
644 self
.topic
.new(rollback
, self
.fake_session
, {"username": self
.test_name
, "password": self
.test_name
,
645 "projects": [test_pid
]})
646 self
.assertEqual(len(rollback
), 0, "Wrong rollback length")
647 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.CONFLICT
, "Wrong HTTP status code")
648 self
.assertIn("username '{}' is already used".format(self
.test_name
),
649 norm(str(e
.exception
)), "Wrong exception text")
650 with self
.subTest(i
=3):
652 self
.auth
.get_user_list
.return_value
= []
653 self
.auth
.get_role_list
.side_effect
= [[], []]
654 with self
.assertRaises(AuthconnNotFoundException
, msg
="Accepted user without default role") as e
:
655 self
.topic
.new(rollback
, self
.fake_session
, {"username": self
.test_name
, "password": self
.test_name
,
656 "projects": [str(uuid4())]})
657 self
.assertEqual(len(rollback
), 0, "Wrong rollback length")
658 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.NOT_FOUND
, "Wrong HTTP status code")
659 self
.assertIn("can't find default role for user '{}'".format(self
.test_name
),
660 norm(str(e
.exception
)), "Wrong exception text")
662 def test_conflict_on_edit(self
):
664 with self
.subTest(i
=1):
665 self
.auth
.get_user_list
.return_value
= [{"_id": uid
, "username": self
.test_name
}]
666 new_name
= str(uuid4())
667 with self
.assertRaises(EngineException
, msg
="Accepted uuid as username") as e
:
668 self
.topic
.edit(self
.fake_session
, uid
, {"username": new_name
})
669 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.UNPROCESSABLE_ENTITY
, "Wrong HTTP status code")
670 self
.assertIn("username '{}' cannot have an uuid format".format(new_name
),
671 norm(str(e
.exception
)), "Wrong exception text")
672 with self
.subTest(i
=2):
673 self
.auth
.get_user_list
.return_value
= [{"_id": uid
, "username": self
.test_name
}]
674 self
.auth
.get_role_list
.side_effect
= [[], []]
675 with self
.assertRaises(AuthconnNotFoundException
, msg
="Accepted user without default role") as e
:
676 self
.topic
.edit(self
.fake_session
, uid
, {"projects": [str(uuid4())]})
677 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.NOT_FOUND
, "Wrong HTTP status code")
678 self
.assertIn("can't find a default role for user '{}'".format(self
.test_name
),
679 norm(str(e
.exception
)), "Wrong exception text")
680 with self
.subTest(i
=3):
681 admin_uid
= str(uuid4())
682 self
.auth
.get_user_list
.return_value
= [{"_id": admin_uid
, "username": "admin"}]
683 with self
.assertRaises(EngineException
, msg
="Accepted removing system_admin role from admin user") as e
:
684 self
.topic
.edit(self
.fake_session
, admin_uid
,
685 {"remove_project_role_mappings": [{"project": "admin", "role": "system_admin"}]})
686 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.FORBIDDEN
, "Wrong HTTP status code")
687 self
.assertIn("you cannot remove system_admin role from admin user",
688 norm(str(e
.exception
)), "Wrong exception text")
689 with self
.subTest(i
=4):
690 new_name
= "new-user-name"
691 self
.auth
.get_user_list
.side_effect
= [[{"_id": uid
, "name": self
.test_name
}],
692 [{"_id": str(uuid4()), "name": new_name
}]]
693 with self
.assertRaises(EngineException
, msg
="Accepted existing username") as e
:
694 self
.topic
.edit(self
.fake_session
, uid
, {"username": new_name
})
695 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.CONFLICT
, "Wrong HTTP status code")
696 self
.assertIn("username '{}' is already used".format(new_name
),
697 norm(str(e
.exception
)), "Wrong exception text")
699 def test_conflict_on_del(self
):
700 with self
.subTest(i
=1):
702 self
.fake_session
["username"] = self
.test_name
703 user
= user
= {"_id": uid
, "username": self
.test_name
, "project_role_mappings": []}
704 self
.auth
.get_user
.return_value
= user
705 with self
.assertRaises(EngineException
, msg
="Accepted deletion of own user") as e
:
706 self
.topic
.delete(self
.fake_session
, uid
)
707 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.CONFLICT
, "Wrong HTTP status code")
708 self
.assertIn("you cannot delete your own login user", norm(str(e
.exception
)), "Wrong exception text")
711 class Test_CommonVimWimSdn(TestCase
):
715 cls
.test_name
= "test-cim-topic" # CIM = Common Infrastructure Manager
718 self
.db
= Mock(dbbase
.DbBase())
719 self
.fs
= Mock(fsbase
.FsBase())
720 self
.msg
= Mock(msgbase
.MsgBase())
721 self
.auth
= Mock(authconn
.Authconn(None, None, None))
722 self
.topic
= CommonVimWimSdn(self
.db
, self
.fs
, self
.msg
, self
.auth
)
723 # Use WIM schemas for testing because they are the simplest
724 self
.topic
._send
_msg
= Mock()
725 self
.topic
.topic
= "wims"
726 self
.topic
.schema_new
= validation
.wim_account_new_schema
727 self
.topic
.schema_edit
= validation
.wim_account_edit_schema
728 self
.fake_session
= {"username": test_name
, "project_id": (test_pid
,), "method": None,
729 "admin": True, "force": False, "public": False, "allow_show_user_project_role": True}
730 self
.topic
.check_quota
= Mock(return_value
=None) # skip quota
732 def test_new_cvws(self
):
733 test_url
= "http://0.0.0.0:0"
734 with self
.subTest(i
=1):
737 self
.db
.get_one
.return_value
= None
738 self
.db
.create
.side_effect
= lambda self
, content
: content
["_id"]
739 cid
, oid
= self
.topic
.new(rollback
, self
.fake_session
,
740 {"name": self
.test_name
, "wim_url": test_url
, "wim_type": test_type
})
741 self
.assertEqual(len(rollback
), 1, "Wrong rollback length")
742 args
= self
.db
.create
.call_args
[0]
744 self
.assertEqual(args
[0], self
.topic
.topic
, "Wrong topic")
745 self
.assertEqual(content
["_id"], cid
, "Wrong CIM identifier")
746 self
.assertEqual(content
["name"], self
.test_name
, "Wrong CIM name")
747 self
.assertEqual(content
["wim_url"], test_url
, "Wrong URL")
748 self
.assertEqual(content
["wim_type"], test_type
, "Wrong CIM type")
749 self
.assertEqual(content
["schema_version"], "1.11", "Wrong schema version")
750 self
.assertEqual(content
["op_id"], oid
, "Wrong operation identifier")
751 self
.assertIsNotNone(content
["_admin"]["created"], "Wrong creation time")
752 self
.assertEqual(content
["_admin"]["modified"], content
["_admin"]["created"], "Wrong modification time")
753 self
.assertEqual(content
["_admin"]["operationalState"], "PROCESSING", "Wrong operational state")
754 self
.assertEqual(content
["_admin"]["projects_read"], [test_pid
], "Wrong read-only projects")
755 self
.assertEqual(content
["_admin"]["projects_write"], [test_pid
], "Wrong read/write projects")
756 self
.assertIsNone(content
["_admin"]["current_operation"], "Wrong current operation")
757 self
.assertEqual(len(content
["_admin"]["operations"]), 1, "Wrong number of operations")
758 operation
= content
["_admin"]["operations"][0]
759 self
.assertEqual(operation
["lcmOperationType"], "create", "Wrong operation type")
760 self
.assertEqual(operation
["operationState"], "PROCESSING", "Wrong operation state")
761 self
.assertGreater(operation
["startTime"], content
["_admin"]["created"], "Wrong operation start time")
762 self
.assertGreater(operation
["statusEnteredTime"], content
["_admin"]["created"],
763 "Wrong operation status enter time")
764 self
.assertEqual(operation
["detailed-status"], "", "Wrong operation detailed status info")
765 self
.assertIsNone(operation
["operationParams"], "Wrong operation parameters")
766 # This test is disabled. From Feature 8030 we admit all WIM/SDN types
767 # with self.subTest(i=2):
769 # test_type = "bad_type"
770 # with self.assertRaises(EngineException, msg="Accepted wrong CIM type") as e:
771 # self.topic.new(rollback, self.fake_session,
772 # {"name": self.test_name, "wim_url": test_url, "wim_type": test_type})
773 # self.assertEqual(len(rollback), 0, "Wrong rollback length")
774 # self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
775 # self.assertIn("format error at '{}' '{}".format("wim_type", "'{}' is not one of {}").format(test_type,""),
776 # norm(str(e.exception)), "Wrong exception text")
778 def test_conflict_on_new(self
):
779 with self
.subTest(i
=1):
781 test_url
= "http://0.0.0.0:0"
783 self
.db
.get_one
.return_value
= {"_id": str(uuid4()), "name": self
.test_name
}
784 with self
.assertRaises(EngineException
, msg
="Accepted existing CIM name") as e
:
785 self
.topic
.new(rollback
, self
.fake_session
,
786 {"name": self
.test_name
, "wim_url": test_url
, "wim_type": test_type
})
787 self
.assertEqual(len(rollback
), 0, "Wrong rollback length")
788 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.CONFLICT
, "Wrong HTTP status code")
789 self
.assertIn("name '{}' already exists for {}".format(self
.test_name
, self
.topic
.topic
),
790 norm(str(e
.exception
)), "Wrong exception text")
792 def test_edit_cvws(self
):
795 test_url
= "http://0.0.0.0:0"
797 cvws
= {"_id": cid
, "name": self
.test_name
, "wim_url": test_url
, "wim_type": test_type
,
798 "_admin": {"created": now
, "modified": now
, "operations": [{"lcmOperationType": "create"}]}}
799 with self
.subTest(i
=1):
800 new_name
= "new-cim-name"
801 new_url
= "https://1.1.1.1:1"
803 self
.db
.get_one
.side_effect
= [cvws
, None]
804 self
.db
.replace
.return_value
= {"updated": 1}
805 # self.db.encrypt.side_effect = [b64str(), b64str()]
806 self
.topic
.edit(self
.fake_session
, cid
, {"name": new_name
, "wim_url": new_url
, "wim_type": new_type
})
807 args
= self
.db
.replace
.call_args
[0]
809 self
.assertEqual(args
[0], self
.topic
.topic
, "Wrong topic")
810 self
.assertEqual(args
[1], cid
, "Wrong CIM identifier")
811 self
.assertEqual(content
["_id"], cid
, "Wrong CIM identifier")
812 self
.assertEqual(content
["name"], new_name
, "Wrong CIM name")
813 self
.assertEqual(content
["wim_type"], new_type
, "Wrong CIM type")
814 self
.assertEqual(content
["wim_url"], new_url
, "Wrong URL")
815 self
.assertEqual(content
["_admin"]["created"], now
, "Wrong creation time")
816 self
.assertGreater(content
["_admin"]["modified"], content
["_admin"]["created"], "Wrong modification time")
817 self
.assertEqual(len(content
["_admin"]["operations"]), 2, "Wrong number of operations")
818 operation
= content
["_admin"]["operations"][1]
819 self
.assertEqual(operation
["lcmOperationType"], "edit", "Wrong operation type")
820 self
.assertEqual(operation
["operationState"], "PROCESSING", "Wrong operation state")
821 self
.assertGreater(operation
["startTime"], content
["_admin"]["modified"], "Wrong operation start time")
822 self
.assertGreater(operation
["statusEnteredTime"], content
["_admin"]["modified"],
823 "Wrong operation status enter time")
824 self
.assertEqual(operation
["detailed-status"], "", "Wrong operation detailed status info")
825 self
.assertIsNone(operation
["operationParams"], "Wrong operation parameters")
826 with self
.subTest(i
=2):
827 self
.db
.get_one
.side_effect
= [cvws
]
828 with self
.assertRaises(EngineException
, msg
="Accepted wrong property") as e
:
829 self
.topic
.edit(self
.fake_session
, str(uuid4()), {"name": "new-name", "extra_prop": "anything"})
830 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.UNPROCESSABLE_ENTITY
, "Wrong HTTP status code")
831 self
.assertIn("format error '{}'".format("additional properties are not allowed ('{}' was unexpected)").
832 format("extra_prop"),
833 norm(str(e
.exception
)), "Wrong exception text")
835 def test_conflict_on_edit(self
):
836 with self
.subTest(i
=1):
838 new_name
= "new-cim-name"
839 self
.db
.get_one
.side_effect
= [{"_id": cid
, "name": self
.test_name
},
840 {"_id": str(uuid4()), "name": new_name
}]
841 with self
.assertRaises(EngineException
, msg
="Accepted existing CIM name") as e
:
842 self
.topic
.edit(self
.fake_session
, cid
, {"name": new_name
})
843 self
.assertEqual(e
.exception
.http_code
, HTTPStatus
.CONFLICT
, "Wrong HTTP status code")
844 self
.assertIn("name '{}' already exists for {}".format(new_name
, self
.topic
.topic
),
845 norm(str(e
.exception
)), "Wrong exception text")
847 def test_delete_cvws(self
):
849 ro_pid
= str(uuid4())
850 rw_pid
= str(uuid4())
851 cvws
= {"_id": cid
, "name": self
.test_name
}
852 self
.db
.get_list
.return_value
= []
853 with self
.subTest(i
=1):
854 cvws
["_admin"] = {"projects_read": [test_pid
, ro_pid
, rw_pid
], "projects_write": [test_pid
, rw_pid
]}
855 self
.db
.get_one
.return_value
= cvws
856 oid
= self
.topic
.delete(self
.fake_session
, cid
)
857 self
.assertIsNone(oid
, "Wrong operation identifier")
858 self
.assertEqual(self
.db
.get_one
.call_args
[0][0], self
.topic
.topic
, "Wrong topic")
859 self
.assertEqual(self
.db
.get_one
.call_args
[0][1]["_id"], cid
, "Wrong CIM identifier")
860 self
.assertEqual(self
.db
.set_one
.call_args
[0][0], self
.topic
.topic
, "Wrong topic")
861 self
.assertEqual(self
.db
.set_one
.call_args
[0][1]["_id"], cid
, "Wrong CIM identifier")
862 self
.assertEqual(self
.db
.set_one
.call_args
[1]["update_dict"], None,
863 "Wrong read-only projects update")
864 self
.assertEqual(self
.db
.set_one
.call_args
[1]["pull_list"],
865 {"_admin.projects_read": (test_pid
,), "_admin.projects_write": (test_pid
,)},
866 "Wrong read/write projects update")
867 self
.topic
._send
_msg
.assert_not_called()
868 with self
.subTest(i
=2):
870 cvws
["_admin"] = {"projects_read": [test_pid
], "projects_write": [test_pid
], "operations": []}
871 self
.db
.get_one
.return_value
= cvws
872 oid
= self
.topic
.delete(self
.fake_session
, cid
)
873 self
.assertEqual(oid
, cid
+":0", "Wrong operation identifier")
874 self
.assertEqual(self
.db
.get_one
.call_args
[0][0], self
.topic
.topic
, "Wrong topic")
875 self
.assertEqual(self
.db
.get_one
.call_args
[0][1]["_id"], cid
, "Wrong CIM identifier")
876 self
.assertEqual(self
.db
.set_one
.call_args
[0][0], self
.topic
.topic
, "Wrong topic")
877 self
.assertEqual(self
.db
.set_one
.call_args
[0][1]["_id"], cid
, "Wrong user identifier")
878 self
.assertEqual(self
.db
.set_one
.call_args
[1]["update_dict"], {"_admin.to_delete": True},
879 "Wrong _admin.to_delete update")
880 operation
= self
.db
.set_one
.call_args
[1]["push"]["_admin.operations"]
881 self
.assertEqual(operation
["lcmOperationType"], "delete", "Wrong operation type")
882 self
.assertEqual(operation
["operationState"], "PROCESSING", "Wrong operation state")
883 self
.assertEqual(operation
["detailed-status"], "", "Wrong operation detailed status")
884 self
.assertIsNone(operation
["operationParams"], "Wrong operation parameters")
885 self
.assertGreater(operation
["startTime"], now
, "Wrong operation start time")
886 self
.assertGreater(operation
["statusEnteredTime"], now
, "Wrong operation status enter time")
887 self
.topic
._send
_msg
.assert_called_once_with("delete", {"_id": cid
, "op_id": cid
+ ":0"}, not_send_msg
=None)
888 with self
.subTest(i
=3):
889 cvws
["_admin"] = {"projects_read": [], "projects_write": [], "operations": []}
890 self
.db
.get_one
.return_value
= cvws
891 self
.topic
._send
_msg
.reset_mock()
892 self
.db
.get_one
.reset_mock()
893 self
.db
.del_one
.reset_mock()
894 self
.fake_session
["force"] = True # to force deletion
895 self
.fake_session
["admin"] = True # to force deletion
896 self
.fake_session
["project_id"] = [] # to force deletion
897 oid
= self
.topic
.delete(self
.fake_session
, cid
)
898 self
.assertIsNone(oid
, "Wrong operation identifier")
899 self
.assertEqual(self
.db
.get_one
.call_args
[0][0], self
.topic
.topic
, "Wrong topic")
900 self
.assertEqual(self
.db
.get_one
.call_args
[0][1]["_id"], cid
, "Wrong CIM identifier")
901 self
.assertEqual(self
.db
.del_one
.call_args
[0][0], self
.topic
.topic
, "Wrong topic")
902 self
.assertEqual(self
.db
.del_one
.call_args
[0][1]["_id"], cid
, "Wrong CIM identifier")
903 self
.topic
._send
_msg
.assert_called_once_with("deleted", {"_id": cid
, "op_id": None}, not_send_msg
=None)
906 if __name__
== '__main__':