blob: 74528f8f62354567cd2f6d0d72cc9177c71afa17 [file] [log] [blame]
delacruzramo79e40f42019-10-10 16:36:40 +02001#! /usr/bin/python3
2# -*- coding: utf-8 -*-
3
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13# implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17__author__ = "Pedro de la Cruz Ramos, pedro.delacruzramos@altran.com"
18__date__ = "$2019-10-019"
19
20import unittest
21from unittest import TestCase
David Garciaecb41322021-03-31 19:10:46 +020022from unittest.mock import Mock, patch, call
delacruzramo79e40f42019-10-10 16:36:40 +020023from uuid import uuid4
24from http import HTTPStatus
25from time import time
26from random import randint
27from osm_common import dbbase, fsbase, msgbase
28from osm_nbi import authconn, validation
David Garciaecb41322021-03-31 19:10:46 +020029from osm_nbi.admin_topics import (
30 ProjectTopicAuth,
31 RoleTopicAuth,
32 UserTopicAuth,
33 CommonVimWimSdn,
34 VcaTopic,
35)
delacruzramo79e40f42019-10-10 16:36:40 +020036from osm_nbi.engine import EngineException
37from osm_nbi.authconn import AuthconnNotFoundException
38
39
40test_pid = str(uuid4())
41test_name = "test-user"
42
43
44def norm(str):
45 """Normalize string for checking"""
46 return ' '.join(str.strip().split()).lower()
47
48
David Garciaecb41322021-03-31 19:10:46 +020049class TestVcaTopic(TestCase):
50 def setUp(self):
51 self.db = Mock(dbbase.DbBase())
52 self.fs = Mock(fsbase.FsBase())
53 self.msg = Mock(msgbase.MsgBase())
54 self.auth = Mock(authconn.Authconn(None, None, None))
55 self.vca_topic = VcaTopic(self.db, self.fs, self.msg, self.auth)
56
57 @patch("osm_nbi.admin_topics.CommonVimWimSdn.format_on_new")
58 def test_format_on_new(self, mock_super_format_on_new):
59 content = {
60 "_id": "id",
61 "secret": "encrypted_secret",
62 "cacert": "encrypted_cacert",
63 }
64 self.db.encrypt.side_effect = ["secret", "cacert"]
65 mock_super_format_on_new.return_value = "1234"
66
67 oid = self.vca_topic.format_on_new(content)
68
69 self.assertEqual(oid, "1234")
70 self.assertEqual(content["secret"], "secret")
71 self.assertEqual(content["cacert"], "cacert")
72 self.db.encrypt.assert_has_calls(
73 [
74 call("encrypted_secret", schema_version="1.11", salt="id"),
75 call("encrypted_cacert", schema_version="1.11", salt="id"),
76 ]
77 )
78 mock_super_format_on_new.assert_called_with(content, None, False)
79
80 @patch("osm_nbi.admin_topics.CommonVimWimSdn.format_on_edit")
81 def test_format_on_edit(self, mock_super_format_on_edit):
82 edit_content = {
83 "_id": "id",
84 "secret": "encrypted_secret",
85 "cacert": "encrypted_cacert",
86 }
87 final_content = {
88 "_id": "id",
89 "schema_version": "1.11",
90 }
91 self.db.encrypt.side_effect = ["secret", "cacert"]
92 mock_super_format_on_edit.return_value = "1234"
93
94 oid = self.vca_topic.format_on_edit(final_content, edit_content)
95
96 self.assertEqual(oid, "1234")
97 self.assertEqual(final_content["secret"], "secret")
98 self.assertEqual(final_content["cacert"], "cacert")
99 self.db.encrypt.assert_has_calls(
100 [
101 call("encrypted_secret", schema_version="1.11", salt="id"),
102 call("encrypted_cacert", schema_version="1.11", salt="id"),
103 ]
104 )
105 mock_super_format_on_edit.assert_called()
106
107 @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
108 def test_check_conflict_on_del(self, mock_check_conflict_on_del):
109 session = {
110 "project_id": "project-id",
111 "force": False,
112 }
113 _id = "vca-id"
114 db_content = {}
115
116 self.db.get_list.return_value = None
117
118 self.vca_topic.check_conflict_on_del(session, _id, db_content)
119
120 self.db.get_list.assert_called_with(
121 "vim_accounts",
122 {"vca": _id, '_admin.projects_read.cont': 'project-id'},
123 )
124 mock_check_conflict_on_del.assert_called_with(session, _id, db_content)
125
126 @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
127 def test_check_conflict_on_del_force(self, mock_check_conflict_on_del):
128 session = {
129 "project_id": "project-id",
130 "force": True,
131 }
132 _id = "vca-id"
133 db_content = {}
134
135 self.vca_topic.check_conflict_on_del(session, _id, db_content)
136
137 self.db.get_list.assert_not_called()
138 mock_check_conflict_on_del.assert_not_called()
139
140 @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
141 def test_check_conflict_on_del_with_conflict(self, mock_check_conflict_on_del):
142 session = {
143 "project_id": "project-id",
144 "force": False,
145 }
146 _id = "vca-id"
147 db_content = {}
148
149 self.db.get_list.return_value = {"_id": "vim", "vca": "vca-id"}
150
151 with self.assertRaises(EngineException) as context:
152 self.vca_topic.check_conflict_on_del(session, _id, db_content)
153 self.assertEqual(
154 context.exception,
155 EngineException(
156 "There is at least one VIM account using this vca",
157 http_code=HTTPStatus.CONFLICT
158 )
159 )
160
161 self.db.get_list.assert_called_with(
162 "vim_accounts",
163 {"vca": _id, '_admin.projects_read.cont': 'project-id'},
164 )
165 mock_check_conflict_on_del.assert_not_called()
166
167
delacruzramo79e40f42019-10-10 16:36:40 +0200168class Test_ProjectTopicAuth(TestCase):
169
170 @classmethod
171 def setUpClass(cls):
172 cls.test_name = "test-project-topic"
173
174 def setUp(self):
175 self.db = Mock(dbbase.DbBase())
176 self.fs = Mock(fsbase.FsBase())
177 self.msg = Mock(msgbase.MsgBase())
tierno9e87a7f2020-03-23 09:24:10 +0000178 self.auth = Mock(authconn.Authconn(None, None, None))
delacruzramo79e40f42019-10-10 16:36:40 +0200179 self.topic = ProjectTopicAuth(self.db, self.fs, self.msg, self.auth)
180 self.fake_session = {"username": self.test_name, "project_id": (test_pid,), "method": None,
181 "admin": True, "force": False, "public": False, "allow_show_user_project_role": True}
tiernod7749582020-05-28 10:41:10 +0000182 self.topic.check_quota = Mock(return_value=None) # skip quota
delacruzramo79e40f42019-10-10 16:36:40 +0200183
184 def test_new_project(self):
185 with self.subTest(i=1):
186 rollback = []
187 pid1 = str(uuid4())
188 self.auth.get_project_list.return_value = []
189 self.auth.create_project.return_value = pid1
190 pid2, oid = self.topic.new(rollback, self.fake_session, {"name": self.test_name, "quotas": {}})
191 self.assertEqual(len(rollback), 1, "Wrong rollback length")
192 self.assertEqual(pid2, pid1, "Wrong project identifier")
193 content = self.auth.create_project.call_args[0][0]
194 self.assertEqual(content["name"], self.test_name, "Wrong project name")
195 self.assertEqual(content["quotas"], {}, "Wrong quotas")
196 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
197 self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time")
198 with self.subTest(i=2):
199 rollback = []
200 with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e:
201 self.topic.new(rollback, self.fake_session, {"name": "other-project-name", "quotas": {"baditems": 10}})
202 self.assertEqual(len(rollback), 0, "Wrong rollback length")
203 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
204 self.assertIn("format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'"
205 .format("baditems"), norm(str(e.exception)), "Wrong exception text")
206
207 def test_edit_project(self):
208 now = time()
209 pid = str(uuid4())
210 proj = {"_id": pid, "name": self.test_name, "_admin": {"created": now, "modified": now}}
211 with self.subTest(i=1):
212 self.auth.get_project_list.side_effect = [[proj], []]
213 new_name = "new-project-name"
214 quotas = {"vnfds": randint(0, 100), "nsds": randint(0, 100)}
215 self.topic.edit(self.fake_session, pid, {"name": new_name, "quotas": quotas})
216 _id, content = self.auth.update_project.call_args[0]
217 self.assertEqual(_id, pid, "Wrong project identifier")
218 self.assertEqual(content["_id"], pid, "Wrong project identifier")
219 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
220 self.assertGreater(content["_admin"]["modified"], now, "Wrong modification time")
221 self.assertEqual(content["name"], new_name, "Wrong project name")
222 self.assertEqual(content["quotas"], quotas, "Wrong quotas")
223 with self.subTest(i=2):
224 new_name = "other-project-name"
225 quotas = {"baditems": randint(0, 100)}
226 self.auth.get_project_list.side_effect = [[proj], []]
227 with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e:
228 self.topic.edit(self.fake_session, pid, {"name": new_name, "quotas": quotas})
229 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
230 self.assertIn("format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'"
231 .format("baditems"), norm(str(e.exception)), "Wrong exception text")
232
233 def test_conflict_on_new(self):
234 with self.subTest(i=1):
235 rollback = []
236 pid = str(uuid4())
237 with self.assertRaises(EngineException, msg="Accepted uuid as project name") as e:
238 self.topic.new(rollback, self.fake_session, {"name": pid})
239 self.assertEqual(len(rollback), 0, "Wrong rollback length")
240 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
241 self.assertIn("project name '{}' cannot have an uuid format".format(pid),
242 norm(str(e.exception)), "Wrong exception text")
243 with self.subTest(i=2):
244 rollback = []
245 self.auth.get_project_list.return_value = [{"_id": test_pid, "name": self.test_name}]
246 with self.assertRaises(EngineException, msg="Accepted existing project name") as e:
247 self.topic.new(rollback, self.fake_session, {"name": self.test_name})
248 self.assertEqual(len(rollback), 0, "Wrong rollback length")
249 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
250 self.assertIn("project '{}' exists".format(self.test_name),
251 norm(str(e.exception)), "Wrong exception text")
252
253 def test_conflict_on_edit(self):
254 with self.subTest(i=1):
255 self.auth.get_project_list.return_value = [{"_id": test_pid, "name": self.test_name}]
256 new_name = str(uuid4())
257 with self.assertRaises(EngineException, msg="Accepted uuid as project name") as e:
258 self.topic.edit(self.fake_session, test_pid, {"name": new_name})
259 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
260 self.assertIn("project name '{}' cannot have an uuid format".format(new_name),
261 norm(str(e.exception)), "Wrong exception text")
262 with self.subTest(i=2):
263 pid = str(uuid4())
264 self.auth.get_project_list.return_value = [{"_id": pid, "name": "admin"}]
265 with self.assertRaises(EngineException, msg="Accepted renaming of project 'admin'") as e:
266 self.topic.edit(self.fake_session, pid, {"name": "new-name"})
267 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
268 self.assertIn("you cannot rename project 'admin'",
269 norm(str(e.exception)), "Wrong exception text")
270 with self.subTest(i=3):
271 new_name = "new-project-name"
272 self.auth.get_project_list.side_effect = [[{"_id": test_pid, "name": self.test_name}],
273 [{"_id": str(uuid4()), "name": new_name}]]
274 with self.assertRaises(EngineException, msg="Accepted existing project name") as e:
275 self.topic.edit(self.fake_session, pid, {"name": new_name})
276 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
277 self.assertIn("project '{}' is already used".format(new_name),
278 norm(str(e.exception)), "Wrong exception text")
279
280 def test_delete_project(self):
281 with self.subTest(i=1):
282 pid = str(uuid4())
283 self.auth.get_project.return_value = {"_id": pid, "name": "other-project-name"}
284 self.auth.delete_project.return_value = {"deleted": 1}
285 self.auth.get_user_list.return_value = []
286 self.db.get_list.return_value = []
287 rc = self.topic.delete(self.fake_session, pid)
288 self.assertEqual(rc, {"deleted": 1}, "Wrong project deletion return info")
289 self.assertEqual(self.auth.get_project.call_args[0][0], pid, "Wrong project identifier")
290 self.assertEqual(self.auth.delete_project.call_args[0][0], pid, "Wrong project identifier")
291
292 def test_conflict_on_del(self):
293 with self.subTest(i=1):
294 self.auth.get_project.return_value = {"_id": test_pid, "name": self.test_name}
295 with self.assertRaises(EngineException, msg="Accepted deletion of own project") as e:
296 self.topic.delete(self.fake_session, self.test_name)
297 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
298 self.assertIn("you cannot delete your own project", norm(str(e.exception)), "Wrong exception text")
299 with self.subTest(i=2):
300 self.auth.get_project.return_value = {"_id": str(uuid4()), "name": "admin"}
301 with self.assertRaises(EngineException, msg="Accepted deletion of project 'admin'") as e:
302 self.topic.delete(self.fake_session, "admin")
303 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
304 self.assertIn("you cannot delete project 'admin'", norm(str(e.exception)), "Wrong exception text")
305 with self.subTest(i=3):
306 pid = str(uuid4())
307 name = "other-project-name"
308 self.auth.get_project.return_value = {"_id": pid, "name": name}
309 self.auth.get_user_list.return_value = [{"_id": str(uuid4()), "username": self.test_name,
310 "project_role_mappings": [{"project": pid, "role": str(uuid4())}]}]
311 with self.assertRaises(EngineException, msg="Accepted deletion of used project") as e:
312 self.topic.delete(self.fake_session, pid)
313 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
314 self.assertIn("project '{}' ({}) is being used by user '{}'".format(name, pid, self.test_name),
315 norm(str(e.exception)), "Wrong exception text")
316 with self.subTest(i=4):
317 self.auth.get_user_list.return_value = []
318 self.db.get_list.return_value = [{"_id": str(uuid4()), "id": self.test_name,
319 "_admin": {"projects_read": [pid], "projects_write": []}}]
320 with self.assertRaises(EngineException, msg="Accepted deletion of used project") as e:
321 self.topic.delete(self.fake_session, pid)
322 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
323 self.assertIn("project '{}' ({}) is being used by {} '{}'"
324 .format(name, pid, "vnf descriptor", self.test_name),
325 norm(str(e.exception)), "Wrong exception text")
326
327
328class Test_RoleTopicAuth(TestCase):
329
330 @classmethod
331 def setUpClass(cls):
332 cls.test_name = "test-role-topic"
333 cls.test_operations = ["tokens:get"]
334
335 def setUp(self):
336 self.db = Mock(dbbase.DbBase())
337 self.fs = Mock(fsbase.FsBase())
338 self.msg = Mock(msgbase.MsgBase())
tierno9e87a7f2020-03-23 09:24:10 +0000339 self.auth = Mock(authconn.Authconn(None, None, None))
340 self.auth.role_permissions = self.test_operations
341 self.topic = RoleTopicAuth(self.db, self.fs, self.msg, self.auth)
delacruzramo79e40f42019-10-10 16:36:40 +0200342 self.fake_session = {"username": test_name, "project_id": (test_pid,), "method": None,
343 "admin": True, "force": False, "public": False, "allow_show_user_project_role": True}
tiernod7749582020-05-28 10:41:10 +0000344 self.topic.check_quota = Mock(return_value=None) # skip quota
delacruzramo79e40f42019-10-10 16:36:40 +0200345
346 def test_new_role(self):
347 with self.subTest(i=1):
348 rollback = []
349 rid1 = str(uuid4())
350 perms_in = {"tokens": True}
351 perms_out = {"default": False, "admin": False, "tokens": True}
352 self.auth.get_role_list.return_value = []
353 self.auth.create_role.return_value = rid1
354 rid2, oid = self.topic.new(rollback, self.fake_session, {"name": self.test_name, "permissions": perms_in})
355 self.assertEqual(len(rollback), 1, "Wrong rollback length")
356 self.assertEqual(rid2, rid1, "Wrong project identifier")
357 content = self.auth.create_role.call_args[0][0]
358 self.assertEqual(content["name"], self.test_name, "Wrong role name")
359 self.assertEqual(content["permissions"], perms_out, "Wrong permissions")
360 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
361 self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time")
362 with self.subTest(i=2):
363 rollback = []
364 with self.assertRaises(EngineException, msg="Accepted wrong permissions") as e:
365 self.topic.new(rollback, self.fake_session,
366 {"name": "other-role-name", "permissions": {"projects": True}})
367 self.assertEqual(len(rollback), 0, "Wrong rollback length")
368 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
369 self.assertIn("invalid permission '{}'".format("projects"),
370 norm(str(e.exception)), "Wrong exception text")
371
372 def test_edit_role(self):
373 now = time()
374 rid = str(uuid4())
375 role = {"_id": rid, "name": self.test_name, "permissions": {"tokens": True},
376 "_admin": {"created": now, "modified": now}}
377 with self.subTest(i=1):
378 self.auth.get_role_list.side_effect = [[role], []]
379 self.auth.get_role.return_value = role
380 new_name = "new-role-name"
381 perms_in = {"tokens": False, "tokens:get": True}
382 perms_out = {"default": False, "admin": False, "tokens": False, "tokens:get": True}
383 self.topic.edit(self.fake_session, rid, {"name": new_name, "permissions": perms_in})
384 content = self.auth.update_role.call_args[0][0]
385 self.assertEqual(content["_id"], rid, "Wrong role identifier")
386 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
387 self.assertGreater(content["_admin"]["modified"], now, "Wrong modification time")
388 self.assertEqual(content["name"], new_name, "Wrong role name")
389 self.assertEqual(content["permissions"], perms_out, "Wrong permissions")
390 with self.subTest(i=2):
391 new_name = "other-role-name"
392 perms_in = {"tokens": False, "tokens:post": True}
393 self.auth.get_role_list.side_effect = [[role], []]
394 with self.assertRaises(EngineException, msg="Accepted wrong permissions") as e:
395 self.topic.edit(self.fake_session, rid, {"name": new_name, "permissions": perms_in})
396 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
397 self.assertIn("invalid permission '{}'".format("tokens:post"),
398 norm(str(e.exception)), "Wrong exception text")
399
400 def test_delete_role(self):
401 with self.subTest(i=1):
402 rid = str(uuid4())
403 role = {"_id": rid, "name": "other-role-name"}
404 self.auth.get_role_list.return_value = [role]
405 self.auth.get_role.return_value = role
406 self.auth.delete_role.return_value = {"deleted": 1}
407 self.auth.get_user_list.return_value = []
408 rc = self.topic.delete(self.fake_session, rid)
409 self.assertEqual(rc, {"deleted": 1}, "Wrong role deletion return info")
410 self.assertEqual(self.auth.get_role_list.call_args[0][0]["_id"], rid, "Wrong role identifier")
411 self.assertEqual(self.auth.get_role.call_args[0][0], rid, "Wrong role identifier")
412 self.assertEqual(self.auth.delete_role.call_args[0][0], rid, "Wrong role identifier")
413
414 def test_conflict_on_new(self):
415 with self.subTest(i=1):
416 rollback = []
417 rid = str(uuid4())
418 with self.assertRaises(EngineException, msg="Accepted uuid as role name") as e:
419 self.topic.new(rollback, self.fake_session, {"name": rid})
420 self.assertEqual(len(rollback), 0, "Wrong rollback length")
421 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
422 self.assertIn("role name '{}' cannot have an uuid format".format(rid),
423 norm(str(e.exception)), "Wrong exception text")
424 with self.subTest(i=2):
425 rollback = []
426 self.auth.get_role_list.return_value = [{"_id": str(uuid4()), "name": self.test_name}]
427 with self.assertRaises(EngineException, msg="Accepted existing role name") as e:
428 self.topic.new(rollback, self.fake_session, {"name": self.test_name})
429 self.assertEqual(len(rollback), 0, "Wrong rollback length")
430 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
431 self.assertIn("role name '{}' exists".format(self.test_name),
432 norm(str(e.exception)), "Wrong exception text")
433
434 def test_conflict_on_edit(self):
435 rid = str(uuid4())
436 with self.subTest(i=1):
437 self.auth.get_role_list.return_value = [{"_id": rid, "name": self.test_name, "permissions": {}}]
438 new_name = str(uuid4())
439 with self.assertRaises(EngineException, msg="Accepted uuid as role name") as e:
440 self.topic.edit(self.fake_session, rid, {"name": new_name})
441 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
442 self.assertIn("role name '{}' cannot have an uuid format".format(new_name),
443 norm(str(e.exception)), "Wrong exception text")
444 for i, role_name in enumerate(["system_admin", "project_admin"], start=2):
445 with self.subTest(i=i):
446 rid = str(uuid4())
447 self.auth.get_role.return_value = {"_id": rid, "name": role_name, "permissions": {}}
448 with self.assertRaises(EngineException, msg="Accepted renaming of role '{}'".format(role_name)) as e:
449 self.topic.edit(self.fake_session, rid, {"name": "new-name"})
450 self.assertEqual(e.exception.http_code, HTTPStatus.FORBIDDEN, "Wrong HTTP status code")
451 self.assertIn("you cannot rename role '{}'".format(role_name),
452 norm(str(e.exception)), "Wrong exception text")
453 with self.subTest(i=i+1):
454 new_name = "new-role-name"
455 self.auth.get_role_list.side_effect = [[{"_id": rid, "name": self.test_name, "permissions": {}}],
456 [{"_id": str(uuid4()), "name": new_name, "permissions": {}}]]
457 self.auth.get_role.return_value = {"_id": rid, "name": self.test_name, "permissions": {}}
458 with self.assertRaises(EngineException, msg="Accepted existing role name") as e:
459 self.topic.edit(self.fake_session, rid, {"name": new_name})
460 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
461 self.assertIn("role name '{}' exists".format(new_name),
462 norm(str(e.exception)), "Wrong exception text")
463
464 def test_conflict_on_del(self):
465 for i, role_name in enumerate(["system_admin", "project_admin"], start=1):
466 with self.subTest(i=i):
467 rid = str(uuid4())
468 role = {"_id": rid, "name": role_name}
469 self.auth.get_role_list.return_value = [role]
470 self.auth.get_role.return_value = role
471 with self.assertRaises(EngineException, msg="Accepted deletion of role '{}'".format(role_name)) as e:
472 self.topic.delete(self.fake_session, rid)
473 self.assertEqual(e.exception.http_code, HTTPStatus.FORBIDDEN, "Wrong HTTP status code")
474 self.assertIn("you cannot delete role '{}'".format(role_name),
475 norm(str(e.exception)), "Wrong exception text")
476 with self.subTest(i=i+1):
477 rid = str(uuid4())
478 name = "other-role-name"
479 role = {"_id": rid, "name": name}
480 self.auth.get_role_list.return_value = [role]
481 self.auth.get_role.return_value = role
482 self.auth.get_user_list.return_value = [{"_id": str(uuid4()), "username": self.test_name,
483 "project_role_mappings": [{"project": str(uuid4()), "role": rid}]}]
484 with self.assertRaises(EngineException, msg="Accepted deletion of used role") as e:
485 self.topic.delete(self.fake_session, rid)
486 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
487 self.assertIn("role '{}' ({}) is being used by user '{}'".format(name, rid, self.test_name),
488 norm(str(e.exception)), "Wrong exception text")
489
490
491class Test_UserTopicAuth(TestCase):
492
493 @classmethod
494 def setUpClass(cls):
495 cls.test_name = "test-user-topic"
496
497 def setUp(self):
498 self.db = Mock(dbbase.DbBase())
499 self.fs = Mock(fsbase.FsBase())
500 self.msg = Mock(msgbase.MsgBase())
tierno9e87a7f2020-03-23 09:24:10 +0000501 self.auth = Mock(authconn.Authconn(None, None, None))
delacruzramo79e40f42019-10-10 16:36:40 +0200502 self.topic = UserTopicAuth(self.db, self.fs, self.msg, self.auth)
503 self.fake_session = {"username": test_name, "project_id": (test_pid,), "method": None,
504 "admin": True, "force": False, "public": False, "allow_show_user_project_role": True}
tiernod7749582020-05-28 10:41:10 +0000505 self.topic.check_quota = Mock(return_value=None) # skip quota
delacruzramo79e40f42019-10-10 16:36:40 +0200506
507 def test_new_user(self):
508 uid1 = str(uuid4())
509 pid = str(uuid4())
510 self.auth.get_user_list.return_value = []
511 self.auth.get_project.return_value = {"_id": pid, "name": "some_project"}
512 self.auth.create_user.return_value = {"_id": uid1, "username": self.test_name}
513 with self.subTest(i=1):
514 rollback = []
515 rid = str(uuid4())
516 self.auth.get_role.return_value = {"_id": rid, "name": "some_role"}
517 prms_in = [{"project": "some_project", "role": "some_role"}]
518 prms_out = [{"project": pid, "role": rid}]
519 uid2, oid = self.topic.new(rollback, self.fake_session, {"username": self.test_name,
520 "password": self.test_name,
521 "project_role_mappings": prms_in
522 })
523 self.assertEqual(len(rollback), 1, "Wrong rollback length")
524 self.assertEqual(uid2, uid1, "Wrong project identifier")
525 content = self.auth.create_user.call_args[0][0]
526 self.assertEqual(content["username"], self.test_name, "Wrong project name")
527 self.assertEqual(content["password"], self.test_name, "Wrong password")
528 self.assertEqual(content["project_role_mappings"], prms_out, "Wrong project-role mappings")
529 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
530 self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time")
531 with self.subTest(i=2):
532 rollback = []
533 def_rid = str(uuid4())
534 def_role = {"_id": def_rid, "name": "project_admin"}
535 self.auth.get_role.return_value = def_role
536 self.auth.get_role_list.return_value = [def_role]
537 prms_out = [{"project": pid, "role": def_rid}]
538 uid2, oid = self.topic.new(rollback, self.fake_session, {"username": self.test_name,
539 "password": self.test_name,
540 "projects": ["some_project"]
541 })
542 self.assertEqual(len(rollback), 1, "Wrong rollback length")
543 self.assertEqual(uid2, uid1, "Wrong project identifier")
544 content = self.auth.create_user.call_args[0][0]
545 self.assertEqual(content["username"], self.test_name, "Wrong project name")
546 self.assertEqual(content["password"], self.test_name, "Wrong password")
547 self.assertEqual(content["project_role_mappings"], prms_out, "Wrong project-role mappings")
548 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
549 self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time")
550 with self.subTest(i=3):
551 rollback = []
552 with self.assertRaises(EngineException, msg="Accepted wrong project-role mappings") as e:
553 self.topic.new(rollback, self.fake_session, {"username": "other-project-name",
554 "password": "other-password",
555 "project_role_mappings": [{}]
556 })
557 self.assertEqual(len(rollback), 0, "Wrong rollback length")
558 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
559 self.assertIn("format error at '{}' '{}'"
560 .format("project_role_mappings:{}", "'{}' is a required property").format(0, "project"),
561 norm(str(e.exception)), "Wrong exception text")
562 with self.subTest(i=4):
563 rollback = []
564 with self.assertRaises(EngineException, msg="Accepted wrong projects") as e:
565 self.topic.new(rollback, self.fake_session, {"username": "other-project-name",
566 "password": "other-password",
567 "projects": []
568 })
569 self.assertEqual(len(rollback), 0, "Wrong rollback length")
570 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
571 self.assertIn("format error at '{}' '{}'" .format("projects", "{} is too short").format([]),
572 norm(str(e.exception)), "Wrong exception text")
573
574 def test_edit_user(self):
575 now = time()
576 uid = str(uuid4())
577 pid1 = str(uuid4())
578 rid1 = str(uuid4())
579 prms = [{"project": pid1, "project_name": "project-1", "role": rid1, "role_name": "role-1"}]
580 user = {"_id": uid, "username": self.test_name, "project_role_mappings": prms,
581 "_admin": {"created": now, "modified": now}}
582 with self.subTest(i=1):
583 self.auth.get_user_list.side_effect = [[user], []]
584 self.auth.get_user.return_value = user
585 pid2 = str(uuid4())
586 rid2 = str(uuid4())
587 self.auth.get_project.side_effect = [{"_id": pid2, "name": "project-2"},
588 {"_id": pid1, "name": "project-1"}]
589 self.auth.get_role.side_effect = [{"_id": rid2, "name": "role-2"},
590 {"_id": rid1, "name": "role-1"}]
591 new_name = "new-user-name"
592 new_pasw = "new-password"
593 add_prms = [{"project": pid2, "role": rid2}]
594 rem_prms = [{"project": pid1, "role": rid1}]
595 self.topic.edit(self.fake_session, uid, {"username": new_name, "password": new_pasw,
596 "add_project_role_mappings": add_prms,
597 "remove_project_role_mappings": rem_prms
598 })
599 content = self.auth.update_user.call_args[0][0]
600 self.assertEqual(content["_id"], uid, "Wrong user identifier")
601 self.assertEqual(content["username"], new_name, "Wrong user name")
602 self.assertEqual(content["password"], new_pasw, "Wrong user password")
603 self.assertEqual(content["add_project_role_mappings"], add_prms, "Wrong project-role mappings to add")
604 self.assertEqual(content["remove_project_role_mappings"], prms, "Wrong project-role mappings to remove")
605 with self.subTest(i=2):
606 new_name = "other-user-name"
607 new_prms = [{}]
608 self.auth.get_role_list.side_effect = [[user], []]
Frank Brydendeba68e2020-07-27 13:55:11 +0000609 self.auth.get_user_list.side_effect = [[user]]
delacruzramo79e40f42019-10-10 16:36:40 +0200610 with self.assertRaises(EngineException, msg="Accepted wrong project-role mappings") as e:
611 self.topic.edit(self.fake_session, uid, {"username": new_name, "project_role_mappings": new_prms})
612 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
613 self.assertIn("format error at '{}' '{}'"
614 .format("project_role_mappings:{}", "'{}' is a required property").format(0, "project"),
615 norm(str(e.exception)), "Wrong exception text")
616
617 def test_delete_user(self):
618 with self.subTest(i=1):
619 uid = str(uuid4())
620 self.fake_session["username"] = self.test_name
621 user = user = {"_id": uid, "username": "other-user-name", "project_role_mappings": []}
622 self.auth.get_user.return_value = user
623 self.auth.delete_user.return_value = {"deleted": 1}
624 rc = self.topic.delete(self.fake_session, uid)
625 self.assertEqual(rc, {"deleted": 1}, "Wrong user deletion return info")
626 self.assertEqual(self.auth.get_user.call_args[0][0], uid, "Wrong user identifier")
627 self.assertEqual(self.auth.delete_user.call_args[0][0], uid, "Wrong user identifier")
628
629 def test_conflict_on_new(self):
630 with self.subTest(i=1):
631 rollback = []
632 uid = str(uuid4())
633 with self.assertRaises(EngineException, msg="Accepted uuid as username") as e:
634 self.topic.new(rollback, self.fake_session, {"username": uid, "password": self.test_name,
635 "projects": [test_pid]})
636 self.assertEqual(len(rollback), 0, "Wrong rollback length")
637 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
638 self.assertIn("username '{}' cannot have a uuid format".format(uid),
639 norm(str(e.exception)), "Wrong exception text")
640 with self.subTest(i=2):
641 rollback = []
642 self.auth.get_user_list.return_value = [{"_id": str(uuid4()), "username": self.test_name}]
643 with self.assertRaises(EngineException, msg="Accepted existing username") as e:
644 self.topic.new(rollback, self.fake_session, {"username": self.test_name, "password": self.test_name,
645 "projects": [test_pid]})
646 self.assertEqual(len(rollback), 0, "Wrong rollback length")
647 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
648 self.assertIn("username '{}' is already used".format(self.test_name),
649 norm(str(e.exception)), "Wrong exception text")
650 with self.subTest(i=3):
651 rollback = []
652 self.auth.get_user_list.return_value = []
653 self.auth.get_role_list.side_effect = [[], []]
654 with self.assertRaises(AuthconnNotFoundException, msg="Accepted user without default role") as e:
655 self.topic.new(rollback, self.fake_session, {"username": self.test_name, "password": self.test_name,
656 "projects": [str(uuid4())]})
657 self.assertEqual(len(rollback), 0, "Wrong rollback length")
658 self.assertEqual(e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code")
659 self.assertIn("can't find default role for user '{}'".format(self.test_name),
660 norm(str(e.exception)), "Wrong exception text")
661
662 def test_conflict_on_edit(self):
663 uid = str(uuid4())
664 with self.subTest(i=1):
665 self.auth.get_user_list.return_value = [{"_id": uid, "username": self.test_name}]
666 new_name = str(uuid4())
667 with self.assertRaises(EngineException, msg="Accepted uuid as username") as e:
668 self.topic.edit(self.fake_session, uid, {"username": new_name})
669 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
670 self.assertIn("username '{}' cannot have an uuid format".format(new_name),
671 norm(str(e.exception)), "Wrong exception text")
672 with self.subTest(i=2):
673 self.auth.get_user_list.return_value = [{"_id": uid, "username": self.test_name}]
674 self.auth.get_role_list.side_effect = [[], []]
675 with self.assertRaises(AuthconnNotFoundException, msg="Accepted user without default role") as e:
676 self.topic.edit(self.fake_session, uid, {"projects": [str(uuid4())]})
677 self.assertEqual(e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code")
678 self.assertIn("can't find a default role for user '{}'".format(self.test_name),
679 norm(str(e.exception)), "Wrong exception text")
680 with self.subTest(i=3):
681 admin_uid = str(uuid4())
682 self.auth.get_user_list.return_value = [{"_id": admin_uid, "username": "admin"}]
683 with self.assertRaises(EngineException, msg="Accepted removing system_admin role from admin user") as e:
684 self.topic.edit(self.fake_session, admin_uid,
685 {"remove_project_role_mappings": [{"project": "admin", "role": "system_admin"}]})
686 self.assertEqual(e.exception.http_code, HTTPStatus.FORBIDDEN, "Wrong HTTP status code")
687 self.assertIn("you cannot remove system_admin role from admin user",
688 norm(str(e.exception)), "Wrong exception text")
689 with self.subTest(i=4):
690 new_name = "new-user-name"
691 self.auth.get_user_list.side_effect = [[{"_id": uid, "name": self.test_name}],
692 [{"_id": str(uuid4()), "name": new_name}]]
693 with self.assertRaises(EngineException, msg="Accepted existing username") as e:
694 self.topic.edit(self.fake_session, uid, {"username": new_name})
695 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
696 self.assertIn("username '{}' is already used".format(new_name),
697 norm(str(e.exception)), "Wrong exception text")
698
699 def test_conflict_on_del(self):
700 with self.subTest(i=1):
701 uid = str(uuid4())
702 self.fake_session["username"] = self.test_name
703 user = user = {"_id": uid, "username": self.test_name, "project_role_mappings": []}
704 self.auth.get_user.return_value = user
705 with self.assertRaises(EngineException, msg="Accepted deletion of own user") as e:
706 self.topic.delete(self.fake_session, uid)
707 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
708 self.assertIn("you cannot delete your own login user", norm(str(e.exception)), "Wrong exception text")
709
710
711class Test_CommonVimWimSdn(TestCase):
712
713 @classmethod
714 def setUpClass(cls):
715 cls.test_name = "test-cim-topic" # CIM = Common Infrastructure Manager
716
717 def setUp(self):
718 self.db = Mock(dbbase.DbBase())
719 self.fs = Mock(fsbase.FsBase())
720 self.msg = Mock(msgbase.MsgBase())
tierno9e87a7f2020-03-23 09:24:10 +0000721 self.auth = Mock(authconn.Authconn(None, None, None))
delacruzramo79e40f42019-10-10 16:36:40 +0200722 self.topic = CommonVimWimSdn(self.db, self.fs, self.msg, self.auth)
723 # Use WIM schemas for testing because they are the simplest
tiernof5f2e3f2020-03-23 14:42:10 +0000724 self.topic._send_msg = Mock()
delacruzramo79e40f42019-10-10 16:36:40 +0200725 self.topic.topic = "wims"
726 self.topic.schema_new = validation.wim_account_new_schema
727 self.topic.schema_edit = validation.wim_account_edit_schema
728 self.fake_session = {"username": test_name, "project_id": (test_pid,), "method": None,
729 "admin": True, "force": False, "public": False, "allow_show_user_project_role": True}
tiernod7749582020-05-28 10:41:10 +0000730 self.topic.check_quota = Mock(return_value=None) # skip quota
delacruzramo79e40f42019-10-10 16:36:40 +0200731
732 def test_new_cvws(self):
733 test_url = "http://0.0.0.0:0"
734 with self.subTest(i=1):
735 rollback = []
736 test_type = "fake"
737 self.db.get_one.return_value = None
738 self.db.create.side_effect = lambda self, content: content["_id"]
739 cid, oid = self.topic.new(rollback, self.fake_session,
740 {"name": self.test_name, "wim_url": test_url, "wim_type": test_type})
741 self.assertEqual(len(rollback), 1, "Wrong rollback length")
742 args = self.db.create.call_args[0]
743 content = args[1]
744 self.assertEqual(args[0], self.topic.topic, "Wrong topic")
745 self.assertEqual(content["_id"], cid, "Wrong CIM identifier")
746 self.assertEqual(content["name"], self.test_name, "Wrong CIM name")
747 self.assertEqual(content["wim_url"], test_url, "Wrong URL")
748 self.assertEqual(content["wim_type"], test_type, "Wrong CIM type")
749 self.assertEqual(content["schema_version"], "1.11", "Wrong schema version")
750 self.assertEqual(content["op_id"], oid, "Wrong operation identifier")
751 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
752 self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time")
753 self.assertEqual(content["_admin"]["operationalState"], "PROCESSING", "Wrong operational state")
754 self.assertEqual(content["_admin"]["projects_read"], [test_pid], "Wrong read-only projects")
755 self.assertEqual(content["_admin"]["projects_write"], [test_pid], "Wrong read/write projects")
756 self.assertIsNone(content["_admin"]["current_operation"], "Wrong current operation")
757 self.assertEqual(len(content["_admin"]["operations"]), 1, "Wrong number of operations")
758 operation = content["_admin"]["operations"][0]
759 self.assertEqual(operation["lcmOperationType"], "create", "Wrong operation type")
760 self.assertEqual(operation["operationState"], "PROCESSING", "Wrong operation state")
761 self.assertGreater(operation["startTime"], content["_admin"]["created"], "Wrong operation start time")
762 self.assertGreater(operation["statusEnteredTime"], content["_admin"]["created"],
763 "Wrong operation status enter time")
764 self.assertEqual(operation["detailed-status"], "", "Wrong operation detailed status info")
765 self.assertIsNone(operation["operationParams"], "Wrong operation parameters")
tiernob3d0a0e2019-11-13 15:57:51 +0000766 # This test is disabled. From Feature 8030 we admit all WIM/SDN types
767 # with self.subTest(i=2):
768 # rollback = []
769 # test_type = "bad_type"
770 # with self.assertRaises(EngineException, msg="Accepted wrong CIM type") as e:
771 # self.topic.new(rollback, self.fake_session,
772 # {"name": self.test_name, "wim_url": test_url, "wim_type": test_type})
773 # self.assertEqual(len(rollback), 0, "Wrong rollback length")
774 # self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
775 # self.assertIn("format error at '{}' '{}".format("wim_type", "'{}' is not one of {}").format(test_type,""),
776 # norm(str(e.exception)), "Wrong exception text")
delacruzramo79e40f42019-10-10 16:36:40 +0200777
778 def test_conflict_on_new(self):
779 with self.subTest(i=1):
780 rollback = []
781 test_url = "http://0.0.0.0:0"
782 test_type = "fake"
783 self.db.get_one.return_value = {"_id": str(uuid4()), "name": self.test_name}
784 with self.assertRaises(EngineException, msg="Accepted existing CIM name") as e:
785 self.topic.new(rollback, self.fake_session,
786 {"name": self.test_name, "wim_url": test_url, "wim_type": test_type})
787 self.assertEqual(len(rollback), 0, "Wrong rollback length")
788 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
789 self.assertIn("name '{}' already exists for {}".format(self.test_name, self.topic.topic),
790 norm(str(e.exception)), "Wrong exception text")
791
792 def test_edit_cvws(self):
793 now = time()
794 cid = str(uuid4())
795 test_url = "http://0.0.0.0:0"
796 test_type = "fake"
797 cvws = {"_id": cid, "name": self.test_name, "wim_url": test_url, "wim_type": test_type,
798 "_admin": {"created": now, "modified": now, "operations": [{"lcmOperationType": "create"}]}}
799 with self.subTest(i=1):
800 new_name = "new-cim-name"
801 new_url = "https://1.1.1.1:1"
802 new_type = "onos"
803 self.db.get_one.side_effect = [cvws, None]
804 self.db.replace.return_value = {"updated": 1}
805 # self.db.encrypt.side_effect = [b64str(), b64str()]
806 self.topic.edit(self.fake_session, cid, {"name": new_name, "wim_url": new_url, "wim_type": new_type})
807 args = self.db.replace.call_args[0]
808 content = args[2]
809 self.assertEqual(args[0], self.topic.topic, "Wrong topic")
810 self.assertEqual(args[1], cid, "Wrong CIM identifier")
811 self.assertEqual(content["_id"], cid, "Wrong CIM identifier")
812 self.assertEqual(content["name"], new_name, "Wrong CIM name")
813 self.assertEqual(content["wim_type"], new_type, "Wrong CIM type")
814 self.assertEqual(content["wim_url"], new_url, "Wrong URL")
815 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
816 self.assertGreater(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time")
817 self.assertEqual(len(content["_admin"]["operations"]), 2, "Wrong number of operations")
818 operation = content["_admin"]["operations"][1]
819 self.assertEqual(operation["lcmOperationType"], "edit", "Wrong operation type")
820 self.assertEqual(operation["operationState"], "PROCESSING", "Wrong operation state")
821 self.assertGreater(operation["startTime"], content["_admin"]["modified"], "Wrong operation start time")
822 self.assertGreater(operation["statusEnteredTime"], content["_admin"]["modified"],
823 "Wrong operation status enter time")
824 self.assertEqual(operation["detailed-status"], "", "Wrong operation detailed status info")
825 self.assertIsNone(operation["operationParams"], "Wrong operation parameters")
826 with self.subTest(i=2):
Frank Brydendeba68e2020-07-27 13:55:11 +0000827 self.db.get_one.side_effect = [cvws]
delacruzramo79e40f42019-10-10 16:36:40 +0200828 with self.assertRaises(EngineException, msg="Accepted wrong property") as e:
829 self.topic.edit(self.fake_session, str(uuid4()), {"name": "new-name", "extra_prop": "anything"})
830 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
831 self.assertIn("format error '{}'".format("additional properties are not allowed ('{}' was unexpected)").
832 format("extra_prop"),
833 norm(str(e.exception)), "Wrong exception text")
834
835 def test_conflict_on_edit(self):
836 with self.subTest(i=1):
837 cid = str(uuid4())
838 new_name = "new-cim-name"
839 self.db.get_one.side_effect = [{"_id": cid, "name": self.test_name},
840 {"_id": str(uuid4()), "name": new_name}]
841 with self.assertRaises(EngineException, msg="Accepted existing CIM name") as e:
842 self.topic.edit(self.fake_session, cid, {"name": new_name})
843 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
844 self.assertIn("name '{}' already exists for {}".format(new_name, self.topic.topic),
845 norm(str(e.exception)), "Wrong exception text")
846
847 def test_delete_cvws(self):
848 cid = str(uuid4())
849 ro_pid = str(uuid4())
850 rw_pid = str(uuid4())
851 cvws = {"_id": cid, "name": self.test_name}
delacruzramo35c998b2019-11-21 11:09:16 +0100852 self.db.get_list.return_value = []
delacruzramo79e40f42019-10-10 16:36:40 +0200853 with self.subTest(i=1):
854 cvws["_admin"] = {"projects_read": [test_pid, ro_pid, rw_pid], "projects_write": [test_pid, rw_pid]}
855 self.db.get_one.return_value = cvws
856 oid = self.topic.delete(self.fake_session, cid)
857 self.assertIsNone(oid, "Wrong operation identifier")
858 self.assertEqual(self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic")
859 self.assertEqual(self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier")
860 self.assertEqual(self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic")
861 self.assertEqual(self.db.set_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier")
tiernof5f2e3f2020-03-23 14:42:10 +0000862 self.assertEqual(self.db.set_one.call_args[1]["update_dict"], None,
delacruzramo79e40f42019-10-10 16:36:40 +0200863 "Wrong read-only projects update")
tierno20e74d22020-06-22 12:17:22 +0000864 self.assertEqual(self.db.set_one.call_args[1]["pull_list"],
865 {"_admin.projects_read": (test_pid,), "_admin.projects_write": (test_pid,)},
delacruzramo79e40f42019-10-10 16:36:40 +0200866 "Wrong read/write projects update")
tiernof5f2e3f2020-03-23 14:42:10 +0000867 self.topic._send_msg.assert_not_called()
delacruzramo35c998b2019-11-21 11:09:16 +0100868 with self.subTest(i=2):
delacruzramo79e40f42019-10-10 16:36:40 +0200869 now = time()
870 cvws["_admin"] = {"projects_read": [test_pid], "projects_write": [test_pid], "operations": []}
871 self.db.get_one.return_value = cvws
872 oid = self.topic.delete(self.fake_session, cid)
873 self.assertEqual(oid, cid+":0", "Wrong operation identifier")
874 self.assertEqual(self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic")
875 self.assertEqual(self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier")
876 self.assertEqual(self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic")
877 self.assertEqual(self.db.set_one.call_args[0][1]["_id"], cid, "Wrong user identifier")
tiernof5f2e3f2020-03-23 14:42:10 +0000878 self.assertEqual(self.db.set_one.call_args[1]["update_dict"], {"_admin.to_delete": True},
879 "Wrong _admin.to_delete update")
delacruzramo79e40f42019-10-10 16:36:40 +0200880 operation = self.db.set_one.call_args[1]["push"]["_admin.operations"]
881 self.assertEqual(operation["lcmOperationType"], "delete", "Wrong operation type")
882 self.assertEqual(operation["operationState"], "PROCESSING", "Wrong operation state")
883 self.assertEqual(operation["detailed-status"], "", "Wrong operation detailed status")
884 self.assertIsNone(operation["operationParams"], "Wrong operation parameters")
885 self.assertGreater(operation["startTime"], now, "Wrong operation start time")
886 self.assertGreater(operation["statusEnteredTime"], now, "Wrong operation status enter time")
tiernof5f2e3f2020-03-23 14:42:10 +0000887 self.topic._send_msg.assert_called_once_with("delete", {"_id": cid, "op_id": cid + ":0"}, not_send_msg=None)
delacruzramo79e40f42019-10-10 16:36:40 +0200888 with self.subTest(i=3):
889 cvws["_admin"] = {"projects_read": [], "projects_write": [], "operations": []}
890 self.db.get_one.return_value = cvws
tiernof5f2e3f2020-03-23 14:42:10 +0000891 self.topic._send_msg.reset_mock()
892 self.db.get_one.reset_mock()
893 self.db.del_one.reset_mock()
delacruzramo79e40f42019-10-10 16:36:40 +0200894 self.fake_session["force"] = True # to force deletion
tiernof5f2e3f2020-03-23 14:42:10 +0000895 self.fake_session["admin"] = True # to force deletion
896 self.fake_session["project_id"] = [] # to force deletion
delacruzramo79e40f42019-10-10 16:36:40 +0200897 oid = self.topic.delete(self.fake_session, cid)
898 self.assertIsNone(oid, "Wrong operation identifier")
899 self.assertEqual(self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic")
900 self.assertEqual(self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier")
901 self.assertEqual(self.db.del_one.call_args[0][0], self.topic.topic, "Wrong topic")
902 self.assertEqual(self.db.del_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier")
tiernof5f2e3f2020-03-23 14:42:10 +0000903 self.topic._send_msg.assert_called_once_with("deleted", {"_id": cid, "op_id": None}, not_send_msg=None)
delacruzramo79e40f42019-10-10 16:36:40 +0200904
905
906if __name__ == '__main__':
907 unittest.main()