Coverage for osm_nbi/tests/test_admin_topics.py: 99%

747 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2024-06-27 02:46 +0000

1#! /usr/bin/python3 

2# -*- coding: utf-8 -*- 

3 

4# Licensed under the Apache License, Version 2.0 (the "License"); 

5# you may not use this file except in compliance with the License. 

6# You may obtain a copy of the License at 

7# 

8# http://www.apache.org/licenses/LICENSE-2.0 

9# 

10# Unless required by applicable law or agreed to in writing, software 

11# distributed under the License is distributed on an "AS IS" BASIS, 

12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 

13# implied. 

14# See the License for the specific language governing permissions and 

15# limitations under the License. 

16 

17__author__ = "Pedro de la Cruz Ramos, pedro.delacruzramos@altran.com" 

18__date__ = "$2019-10-019" 

19 

20import unittest 

21import random 

22from unittest import TestCase 

23from unittest.mock import Mock, patch, call 

24from uuid import uuid4 

25from http import HTTPStatus 

26from time import time 

27from osm_common import dbbase, fsbase, msgbase 

28from osm_common.dbmemory import DbMemory 

29from osm_nbi import authconn, validation 

30from osm_nbi.admin_topics import ( 

31 ProjectTopicAuth, 

32 RoleTopicAuth, 

33 UserTopicAuth, 

34 CommonVimWimSdn, 

35 VcaTopic, 

36) 

37from osm_nbi.engine import EngineException 

38from osm_nbi.authconn import AuthconnNotFoundException 

39from osm_nbi.authconn_internal import AuthconnInternal 

40 

41 

42test_pid = str(uuid4()) 

43test_name = "test-user" 

44 

45 

46def norm(str): 

47 """Normalize string for checking""" 

48 return " ".join(str.strip().split()).lower() 

49 

50 

51class TestVcaTopic(TestCase): 

52 def setUp(self): 

53 self.db = Mock(dbbase.DbBase()) 

54 self.fs = Mock(fsbase.FsBase()) 

55 self.msg = Mock(msgbase.MsgBase()) 

56 self.auth = Mock(authconn.Authconn(None, None, None)) 

57 self.vca_topic = VcaTopic(self.db, self.fs, self.msg, self.auth) 

58 

59 @patch("osm_nbi.admin_topics.CommonVimWimSdn.format_on_new") 

60 def test_format_on_new(self, mock_super_format_on_new): 

61 content = { 

62 "_id": "id", 

63 "secret": "encrypted_secret", 

64 "cacert": "encrypted_cacert", 

65 } 

66 self.db.encrypt.side_effect = ["secret", "cacert"] 

67 mock_super_format_on_new.return_value = "1234" 

68 

69 oid = self.vca_topic.format_on_new(content) 

70 

71 self.assertEqual(oid, "1234") 

72 self.assertEqual(content["secret"], "secret") 

73 self.assertEqual(content["cacert"], "cacert") 

74 self.db.encrypt.assert_has_calls( 

75 [ 

76 call("encrypted_secret", schema_version="1.11", salt="id"), 

77 call("encrypted_cacert", schema_version="1.11", salt="id"), 

78 ] 

79 ) 

80 mock_super_format_on_new.assert_called_with(content, None, False) 

81 

82 @patch("osm_nbi.admin_topics.CommonVimWimSdn.format_on_edit") 

83 def test_format_on_edit(self, mock_super_format_on_edit): 

84 edit_content = { 

85 "_id": "id", 

86 "secret": "encrypted_secret", 

87 "cacert": "encrypted_cacert", 

88 } 

89 final_content = { 

90 "_id": "id", 

91 "schema_version": "1.11", 

92 } 

93 self.db.encrypt.side_effect = ["secret", "cacert"] 

94 mock_super_format_on_edit.return_value = "1234" 

95 

96 oid = self.vca_topic.format_on_edit(final_content, edit_content) 

97 

98 self.assertEqual(oid, "1234") 

99 self.assertEqual(final_content["secret"], "secret") 

100 self.assertEqual(final_content["cacert"], "cacert") 

101 self.db.encrypt.assert_has_calls( 

102 [ 

103 call("encrypted_secret", schema_version="1.11", salt="id"), 

104 call("encrypted_cacert", schema_version="1.11", salt="id"), 

105 ] 

106 ) 

107 mock_super_format_on_edit.assert_called() 

108 

109 @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del") 

110 def test_check_conflict_on_del(self, mock_check_conflict_on_del): 

111 session = { 

112 "project_id": "project-id", 

113 "force": False, 

114 } 

115 _id = "vca-id" 

116 db_content = {} 

117 

118 self.db.get_list.return_value = None 

119 

120 self.vca_topic.check_conflict_on_del(session, _id, db_content) 

121 

122 self.db.get_list.assert_called_with( 

123 "vim_accounts", 

124 {"vca": _id, "_admin.projects_read.cont": "project-id"}, 

125 ) 

126 mock_check_conflict_on_del.assert_called_with(session, _id, db_content) 

127 

128 @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del") 

129 def test_check_conflict_on_del_force(self, mock_check_conflict_on_del): 

130 session = { 

131 "project_id": "project-id", 

132 "force": True, 

133 } 

134 _id = "vca-id" 

135 db_content = {} 

136 

137 self.vca_topic.check_conflict_on_del(session, _id, db_content) 

138 

139 self.db.get_list.assert_not_called() 

140 mock_check_conflict_on_del.assert_not_called() 

141 

142 @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del") 

143 def test_check_conflict_on_del_with_conflict(self, mock_check_conflict_on_del): 

144 session = { 

145 "project_id": "project-id", 

146 "force": False, 

147 } 

148 _id = "vca-id" 

149 db_content = {} 

150 

151 self.db.get_list.return_value = {"_id": "vim", "vca": "vca-id"} 

152 

153 with self.assertRaises(EngineException) as context: 

154 self.vca_topic.check_conflict_on_del(session, _id, db_content) 

155 self.assertEqual( 

156 context.exception, 

157 EngineException( 

158 "There is at least one VIM account using this vca", 

159 http_code=HTTPStatus.CONFLICT, 

160 ), 

161 ) 

162 

163 self.db.get_list.assert_called_with( 

164 "vim_accounts", 

165 {"vca": _id, "_admin.projects_read.cont": "project-id"}, 

166 ) 

167 mock_check_conflict_on_del.assert_not_called() 

168 

169 

170class Test_ProjectTopicAuth(TestCase): 

171 @classmethod 

172 def setUpClass(cls): 

173 cls.test_name = "test-project-topic" 

174 

175 def setUp(self): 

176 self.db = Mock(dbbase.DbBase()) 

177 self.fs = Mock(fsbase.FsBase()) 

178 self.msg = Mock(msgbase.MsgBase()) 

179 self.auth = Mock(authconn.Authconn(None, None, None)) 

180 self.topic = ProjectTopicAuth(self.db, self.fs, self.msg, self.auth) 

181 self.fake_session = { 

182 "username": self.test_name, 

183 "project_id": (test_pid,), 

184 "method": None, 

185 "admin": True, 

186 "force": False, 

187 "public": False, 

188 "allow_show_user_project_role": True, 

189 } 

190 self.topic.check_quota = Mock(return_value=None) # skip quota 

191 

192 def test_new_project(self): 

193 with self.subTest(i=1): 

194 rollback = [] 

195 pid1 = str(uuid4()) 

196 self.auth.get_project_list.return_value = [] 

197 self.auth.create_project.return_value = pid1 

198 pid2, oid = self.topic.new( 

199 rollback, self.fake_session, {"name": self.test_name, "quotas": {}} 

200 ) 

201 self.assertEqual(len(rollback), 1, "Wrong rollback length") 

202 self.assertEqual(pid2, pid1, "Wrong project identifier") 

203 content = self.auth.create_project.call_args[0][0] 

204 self.assertEqual(content["name"], self.test_name, "Wrong project name") 

205 self.assertEqual(content["quotas"], {}, "Wrong quotas") 

206 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time") 

207 self.assertEqual( 

208 content["_admin"]["modified"], 

209 content["_admin"]["created"], 

210 "Wrong modification time", 

211 ) 

212 with self.subTest(i=2): 

213 rollback = [] 

214 with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e: 

215 self.topic.new( 

216 rollback, 

217 self.fake_session, 

218 {"name": "other-project-name", "quotas": {"baditems": 10}}, 

219 ) 

220 self.assertEqual(len(rollback), 0, "Wrong rollback length") 

221 self.assertEqual( 

222 e.exception.http_code, 

223 HTTPStatus.UNPROCESSABLE_ENTITY, 

224 "Wrong HTTP status code", 

225 ) 

226 self.assertIn( 

227 "format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'".format( 

228 "baditems" 

229 ), 

230 norm(str(e.exception)), 

231 "Wrong exception text", 

232 ) 

233 

234 def test_edit_project(self): 

235 now = time() 

236 pid = str(uuid4()) 

237 proj = { 

238 "_id": pid, 

239 "name": self.test_name, 

240 "_admin": {"created": now, "modified": now}, 

241 } 

242 with self.subTest(i=1): 

243 self.auth.get_project_list.side_effect = [[proj], []] 

244 new_name = "new-project-name" 

245 quotas = { 

246 "vnfds": random.SystemRandom().randint(0, 100), 

247 "nsds": random.SystemRandom().randint(0, 100), 

248 } 

249 self.topic.edit( 

250 self.fake_session, pid, {"name": new_name, "quotas": quotas} 

251 ) 

252 _id, content = self.auth.update_project.call_args[0] 

253 self.assertEqual(_id, pid, "Wrong project identifier") 

254 self.assertEqual(content["_id"], pid, "Wrong project identifier") 

255 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time") 

256 self.assertGreater( 

257 content["_admin"]["modified"], now, "Wrong modification time" 

258 ) 

259 self.assertEqual(content["name"], new_name, "Wrong project name") 

260 self.assertEqual(content["quotas"], quotas, "Wrong quotas") 

261 with self.subTest(i=2): 

262 new_name = "other-project-name" 

263 quotas = {"baditems": random.SystemRandom().randint(0, 100)} 

264 self.auth.get_project_list.side_effect = [[proj], []] 

265 with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e: 

266 self.topic.edit( 

267 self.fake_session, pid, {"name": new_name, "quotas": quotas} 

268 ) 

269 self.assertEqual( 

270 e.exception.http_code, 

271 HTTPStatus.UNPROCESSABLE_ENTITY, 

272 "Wrong HTTP status code", 

273 ) 

274 self.assertIn( 

275 "format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'".format( 

276 "baditems" 

277 ), 

278 norm(str(e.exception)), 

279 "Wrong exception text", 

280 ) 

281 

282 def test_conflict_on_new(self): 

283 with self.subTest(i=1): 

284 rollback = [] 

285 pid = str(uuid4()) 

286 with self.assertRaises( 

287 EngineException, msg="Accepted uuid as project name" 

288 ) as e: 

289 self.topic.new(rollback, self.fake_session, {"name": pid}) 

290 self.assertEqual(len(rollback), 0, "Wrong rollback length") 

291 self.assertEqual( 

292 e.exception.http_code, 

293 HTTPStatus.UNPROCESSABLE_ENTITY, 

294 "Wrong HTTP status code", 

295 ) 

296 self.assertIn( 

297 "project name '{}' cannot have an uuid format".format(pid), 

298 norm(str(e.exception)), 

299 "Wrong exception text", 

300 ) 

301 with self.subTest(i=2): 

302 rollback = [] 

303 self.auth.get_project_list.return_value = [ 

304 {"_id": test_pid, "name": self.test_name} 

305 ] 

306 with self.assertRaises( 

307 EngineException, msg="Accepted existing project name" 

308 ) as e: 

309 self.topic.new(rollback, self.fake_session, {"name": self.test_name}) 

310 self.assertEqual(len(rollback), 0, "Wrong rollback length") 

311 self.assertEqual( 

312 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code" 

313 ) 

314 self.assertIn( 

315 "project '{}' exists".format(self.test_name), 

316 norm(str(e.exception)), 

317 "Wrong exception text", 

318 ) 

319 

320 def test_conflict_on_edit(self): 

321 with self.subTest(i=1): 

322 self.auth.get_project_list.return_value = [ 

323 {"_id": test_pid, "name": self.test_name} 

324 ] 

325 new_name = str(uuid4()) 

326 with self.assertRaises( 

327 EngineException, msg="Accepted uuid as project name" 

328 ) as e: 

329 self.topic.edit(self.fake_session, test_pid, {"name": new_name}) 

330 self.assertEqual( 

331 e.exception.http_code, 

332 HTTPStatus.UNPROCESSABLE_ENTITY, 

333 "Wrong HTTP status code", 

334 ) 

335 self.assertIn( 

336 "project name '{}' cannot have an uuid format".format(new_name), 

337 norm(str(e.exception)), 

338 "Wrong exception text", 

339 ) 

340 with self.subTest(i=2): 

341 pid = str(uuid4()) 

342 self.auth.get_project_list.return_value = [{"_id": pid, "name": "admin"}] 

343 with self.assertRaises( 

344 EngineException, msg="Accepted renaming of project 'admin'" 

345 ) as e: 

346 self.topic.edit(self.fake_session, pid, {"name": "new-name"}) 

347 self.assertEqual( 

348 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code" 

349 ) 

350 self.assertIn( 

351 "you cannot rename project 'admin'", 

352 norm(str(e.exception)), 

353 "Wrong exception text", 

354 ) 

355 with self.subTest(i=3): 

356 new_name = "new-project-name" 

357 self.auth.get_project_list.side_effect = [ 

358 [{"_id": test_pid, "name": self.test_name}], 

359 [{"_id": str(uuid4()), "name": new_name}], 

360 ] 

361 with self.assertRaises( 

362 EngineException, msg="Accepted existing project name" 

363 ) as e: 

364 self.topic.edit(self.fake_session, pid, {"name": new_name}) 

365 self.assertEqual( 

366 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code" 

367 ) 

368 self.assertIn( 

369 "project '{}' is already used".format(new_name), 

370 norm(str(e.exception)), 

371 "Wrong exception text", 

372 ) 

373 

374 def test_delete_project(self): 

375 with self.subTest(i=1): 

376 pid = str(uuid4()) 

377 self.auth.get_project.return_value = { 

378 "_id": pid, 

379 "name": "other-project-name", 

380 } 

381 self.auth.delete_project.return_value = {"deleted": 1} 

382 self.auth.get_user_list.return_value = [] 

383 self.db.get_list.return_value = [] 

384 rc = self.topic.delete(self.fake_session, pid) 

385 self.assertEqual(rc, {"deleted": 1}, "Wrong project deletion return info") 

386 self.assertEqual( 

387 self.auth.get_project.call_args[0][0], pid, "Wrong project identifier" 

388 ) 

389 self.assertEqual( 

390 self.auth.delete_project.call_args[0][0], 

391 pid, 

392 "Wrong project identifier", 

393 ) 

394 

395 def test_conflict_on_del(self): 

396 with self.subTest(i=1): 

397 self.auth.get_project.return_value = { 

398 "_id": test_pid, 

399 "name": self.test_name, 

400 } 

401 with self.assertRaises( 

402 EngineException, msg="Accepted deletion of own project" 

403 ) as e: 

404 self.topic.delete(self.fake_session, self.test_name) 

405 self.assertEqual( 

406 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code" 

407 ) 

408 self.assertIn( 

409 "you cannot delete your own project", 

410 norm(str(e.exception)), 

411 "Wrong exception text", 

412 ) 

413 with self.subTest(i=2): 

414 self.auth.get_project.return_value = {"_id": str(uuid4()), "name": "admin"} 

415 with self.assertRaises( 

416 EngineException, msg="Accepted deletion of project 'admin'" 

417 ) as e: 

418 self.topic.delete(self.fake_session, "admin") 

419 self.assertEqual( 

420 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code" 

421 ) 

422 self.assertIn( 

423 "you cannot delete project 'admin'", 

424 norm(str(e.exception)), 

425 "Wrong exception text", 

426 ) 

427 with self.subTest(i=3): 

428 pid = str(uuid4()) 

429 name = "other-project-name" 

430 self.auth.get_project.return_value = {"_id": pid, "name": name} 

431 self.auth.get_user_list.return_value = [ 

432 { 

433 "_id": str(uuid4()), 

434 "username": self.test_name, 

435 "project_role_mappings": [{"project": pid, "role": str(uuid4())}], 

436 } 

437 ] 

438 with self.assertRaises( 

439 EngineException, msg="Accepted deletion of used project" 

440 ) as e: 

441 self.topic.delete(self.fake_session, pid) 

442 self.assertEqual( 

443 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code" 

444 ) 

445 self.assertIn( 

446 "project '{}' ({}) is being used by user '{}'".format( 

447 name, pid, self.test_name 

448 ), 

449 norm(str(e.exception)), 

450 "Wrong exception text", 

451 ) 

452 with self.subTest(i=4): 

453 self.auth.get_user_list.return_value = [] 

454 self.db.get_list.return_value = [ 

455 { 

456 "_id": str(uuid4()), 

457 "id": self.test_name, 

458 "_admin": {"projects_read": [pid], "projects_write": []}, 

459 } 

460 ] 

461 with self.assertRaises( 

462 EngineException, msg="Accepted deletion of used project" 

463 ) as e: 

464 self.topic.delete(self.fake_session, pid) 

465 self.assertEqual( 

466 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code" 

467 ) 

468 self.assertIn( 

469 "project '{}' ({}) is being used by {} '{}'".format( 

470 name, pid, "vnf descriptor", self.test_name 

471 ), 

472 norm(str(e.exception)), 

473 "Wrong exception text", 

474 ) 

475 

476 

477class Test_RoleTopicAuth(TestCase): 

478 @classmethod 

479 def setUpClass(cls): 

480 cls.test_name = "test-role-topic" 

481 cls.test_operations = ["tokens:get"] 

482 

483 def setUp(self): 

484 self.db = Mock(dbbase.DbBase()) 

485 self.fs = Mock(fsbase.FsBase()) 

486 self.msg = Mock(msgbase.MsgBase()) 

487 self.auth = Mock(authconn.Authconn(None, None, None)) 

488 self.auth.role_permissions = self.test_operations 

489 self.topic = RoleTopicAuth(self.db, self.fs, self.msg, self.auth) 

490 self.fake_session = { 

491 "username": test_name, 

492 "project_id": (test_pid,), 

493 "method": None, 

494 "admin": True, 

495 "force": False, 

496 "public": False, 

497 "allow_show_user_project_role": True, 

498 } 

499 self.topic.check_quota = Mock(return_value=None) # skip quota 

500 

501 def test_new_role(self): 

502 with self.subTest(i=1): 

503 rollback = [] 

504 rid1 = str(uuid4()) 

505 perms_in = {"tokens": True} 

506 perms_out = {"default": False, "admin": False, "tokens": True} 

507 self.auth.get_role_list.return_value = [] 

508 self.auth.create_role.return_value = rid1 

509 rid2, oid = self.topic.new( 

510 rollback, 

511 self.fake_session, 

512 {"name": self.test_name, "permissions": perms_in}, 

513 ) 

514 self.assertEqual(len(rollback), 1, "Wrong rollback length") 

515 self.assertEqual(rid2, rid1, "Wrong project identifier") 

516 content = self.auth.create_role.call_args[0][0] 

517 self.assertEqual(content["name"], self.test_name, "Wrong role name") 

518 self.assertEqual(content["permissions"], perms_out, "Wrong permissions") 

519 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time") 

520 self.assertEqual( 

521 content["_admin"]["modified"], 

522 content["_admin"]["created"], 

523 "Wrong modification time", 

524 ) 

525 with self.subTest(i=2): 

526 rollback = [] 

527 with self.assertRaises( 

528 EngineException, msg="Accepted wrong permissions" 

529 ) as e: 

530 self.topic.new( 

531 rollback, 

532 self.fake_session, 

533 {"name": "other-role-name", "permissions": {"projects": True}}, 

534 ) 

535 self.assertEqual(len(rollback), 0, "Wrong rollback length") 

536 self.assertEqual( 

537 e.exception.http_code, 

538 HTTPStatus.UNPROCESSABLE_ENTITY, 

539 "Wrong HTTP status code", 

540 ) 

541 self.assertIn( 

542 "invalid permission '{}'".format("projects"), 

543 norm(str(e.exception)), 

544 "Wrong exception text", 

545 ) 

546 

547 def test_edit_role(self): 

548 now = time() 

549 rid = str(uuid4()) 

550 role = { 

551 "_id": rid, 

552 "name": self.test_name, 

553 "permissions": {"tokens": True}, 

554 "_admin": {"created": now, "modified": now}, 

555 } 

556 with self.subTest(i=1): 

557 self.auth.get_role_list.side_effect = [[role], []] 

558 self.auth.get_role.return_value = role 

559 new_name = "new-role-name" 

560 perms_in = {"tokens": False, "tokens:get": True} 

561 perms_out = { 

562 "default": False, 

563 "admin": False, 

564 "tokens": False, 

565 "tokens:get": True, 

566 } 

567 self.topic.edit( 

568 self.fake_session, rid, {"name": new_name, "permissions": perms_in} 

569 ) 

570 content = self.auth.update_role.call_args[0][0] 

571 self.assertEqual(content["_id"], rid, "Wrong role identifier") 

572 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time") 

573 self.assertGreater( 

574 content["_admin"]["modified"], now, "Wrong modification time" 

575 ) 

576 self.assertEqual(content["name"], new_name, "Wrong role name") 

577 self.assertEqual(content["permissions"], perms_out, "Wrong permissions") 

578 with self.subTest(i=2): 

579 new_name = "other-role-name" 

580 perms_in = {"tokens": False, "tokens:post": True} 

581 self.auth.get_role_list.side_effect = [[role], []] 

582 with self.assertRaises( 

583 EngineException, msg="Accepted wrong permissions" 

584 ) as e: 

585 self.topic.edit( 

586 self.fake_session, rid, {"name": new_name, "permissions": perms_in} 

587 ) 

588 self.assertEqual( 

589 e.exception.http_code, 

590 HTTPStatus.UNPROCESSABLE_ENTITY, 

591 "Wrong HTTP status code", 

592 ) 

593 self.assertIn( 

594 "invalid permission '{}'".format("tokens:post"), 

595 norm(str(e.exception)), 

596 "Wrong exception text", 

597 ) 

598 

599 def test_delete_role(self): 

600 with self.subTest(i=1): 

601 rid = str(uuid4()) 

602 role = {"_id": rid, "name": "other-role-name"} 

603 self.auth.get_role_list.return_value = [role] 

604 self.auth.get_role.return_value = role 

605 self.auth.delete_role.return_value = {"deleted": 1} 

606 self.auth.get_user_list.return_value = [] 

607 rc = self.topic.delete(self.fake_session, rid) 

608 self.assertEqual(rc, {"deleted": 1}, "Wrong role deletion return info") 

609 self.assertEqual( 

610 self.auth.get_role_list.call_args[0][0]["_id"], 

611 rid, 

612 "Wrong role identifier", 

613 ) 

614 self.assertEqual( 

615 self.auth.get_role.call_args[0][0], rid, "Wrong role identifier" 

616 ) 

617 self.assertEqual( 

618 self.auth.delete_role.call_args[0][0], rid, "Wrong role identifier" 

619 ) 

620 

621 def test_conflict_on_new(self): 

622 with self.subTest(i=1): 

623 rollback = [] 

624 rid = str(uuid4()) 

625 with self.assertRaises( 

626 EngineException, msg="Accepted uuid as role name" 

627 ) as e: 

628 self.topic.new(rollback, self.fake_session, {"name": rid}) 

629 self.assertEqual(len(rollback), 0, "Wrong rollback length") 

630 self.assertEqual( 

631 e.exception.http_code, 

632 HTTPStatus.UNPROCESSABLE_ENTITY, 

633 "Wrong HTTP status code", 

634 ) 

635 self.assertIn( 

636 "role name '{}' cannot have an uuid format".format(rid), 

637 norm(str(e.exception)), 

638 "Wrong exception text", 

639 ) 

640 with self.subTest(i=2): 

641 rollback = [] 

642 self.auth.get_role_list.return_value = [ 

643 {"_id": str(uuid4()), "name": self.test_name} 

644 ] 

645 with self.assertRaises( 

646 EngineException, msg="Accepted existing role name" 

647 ) as e: 

648 self.topic.new(rollback, self.fake_session, {"name": self.test_name}) 

649 self.assertEqual(len(rollback), 0, "Wrong rollback length") 

650 self.assertEqual( 

651 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code" 

652 ) 

653 self.assertIn( 

654 "role name '{}' exists".format(self.test_name), 

655 norm(str(e.exception)), 

656 "Wrong exception text", 

657 ) 

658 

659 def test_conflict_on_edit(self): 

660 rid = str(uuid4()) 

661 with self.subTest(i=1): 

662 self.auth.get_role_list.return_value = [ 

663 {"_id": rid, "name": self.test_name, "permissions": {}} 

664 ] 

665 new_name = str(uuid4()) 

666 with self.assertRaises( 

667 EngineException, msg="Accepted uuid as role name" 

668 ) as e: 

669 self.topic.edit(self.fake_session, rid, {"name": new_name}) 

670 self.assertEqual( 

671 e.exception.http_code, 

672 HTTPStatus.UNPROCESSABLE_ENTITY, 

673 "Wrong HTTP status code", 

674 ) 

675 self.assertIn( 

676 "role name '{}' cannot have an uuid format".format(new_name), 

677 norm(str(e.exception)), 

678 "Wrong exception text", 

679 ) 

680 for i, role_name in enumerate(["system_admin", "project_admin"], start=2): 

681 with self.subTest(i=i): 

682 rid = str(uuid4()) 

683 self.auth.get_role.return_value = { 

684 "_id": rid, 

685 "name": role_name, 

686 "permissions": {}, 

687 } 

688 with self.assertRaises( 

689 EngineException, 

690 msg="Accepted renaming of role '{}'".format(role_name), 

691 ) as e: 

692 self.topic.edit(self.fake_session, rid, {"name": "new-name"}) 

693 self.assertEqual( 

694 e.exception.http_code, 

695 HTTPStatus.FORBIDDEN, 

696 "Wrong HTTP status code", 

697 ) 

698 self.assertIn( 

699 "you cannot rename role '{}'".format(role_name), 

700 norm(str(e.exception)), 

701 "Wrong exception text", 

702 ) 

703 with self.subTest(i=i + 1): 

704 new_name = "new-role-name" 

705 self.auth.get_role_list.side_effect = [ 

706 [{"_id": rid, "name": self.test_name, "permissions": {}}], 

707 [{"_id": str(uuid4()), "name": new_name, "permissions": {}}], 

708 ] 

709 self.auth.get_role.return_value = { 

710 "_id": rid, 

711 "name": self.test_name, 

712 "permissions": {}, 

713 } 

714 with self.assertRaises( 

715 EngineException, msg="Accepted existing role name" 

716 ) as e: 

717 self.topic.edit(self.fake_session, rid, {"name": new_name}) 

718 self.assertEqual( 

719 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code" 

720 ) 

721 self.assertIn( 

722 "role name '{}' exists".format(new_name), 

723 norm(str(e.exception)), 

724 "Wrong exception text", 

725 ) 

726 

727 def test_conflict_on_del(self): 

728 for i, role_name in enumerate(["system_admin", "project_admin"], start=1): 

729 with self.subTest(i=i): 

730 rid = str(uuid4()) 

731 role = {"_id": rid, "name": role_name} 

732 self.auth.get_role_list.return_value = [role] 

733 self.auth.get_role.return_value = role 

734 with self.assertRaises( 

735 EngineException, 

736 msg="Accepted deletion of role '{}'".format(role_name), 

737 ) as e: 

738 self.topic.delete(self.fake_session, rid) 

739 self.assertEqual( 

740 e.exception.http_code, 

741 HTTPStatus.FORBIDDEN, 

742 "Wrong HTTP status code", 

743 ) 

744 self.assertIn( 

745 "you cannot delete role '{}'".format(role_name), 

746 norm(str(e.exception)), 

747 "Wrong exception text", 

748 ) 

749 with self.subTest(i=i + 1): 

750 rid = str(uuid4()) 

751 name = "other-role-name" 

752 role = {"_id": rid, "name": name} 

753 self.auth.get_role_list.return_value = [role] 

754 self.auth.get_role.return_value = role 

755 self.auth.get_user_list.return_value = [ 

756 { 

757 "_id": str(uuid4()), 

758 "username": self.test_name, 

759 "project_role_mappings": [{"project": str(uuid4()), "role": rid}], 

760 } 

761 ] 

762 with self.assertRaises( 

763 EngineException, msg="Accepted deletion of used role" 

764 ) as e: 

765 self.topic.delete(self.fake_session, rid) 

766 self.assertEqual( 

767 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code" 

768 ) 

769 self.assertIn( 

770 "role '{}' ({}) is being used by user '{}'".format( 

771 name, rid, self.test_name 

772 ), 

773 norm(str(e.exception)), 

774 "Wrong exception text", 

775 ) 

776 

777 

778class Test_UserTopicAuth(TestCase): 

779 @classmethod 

780 def setUpClass(cls): 

781 cls.test_name = "test-user-topic" 

782 cls.password = "Test@123" 

783 

784 def setUp(self): 

785 # self.db = Mock(dbbase.DbBase()) 

786 self.db = DbMemory() 

787 self.fs = Mock(fsbase.FsBase()) 

788 self.msg = Mock(msgbase.MsgBase()) 

789 self.auth = Mock(authconn.Authconn(None, None, None)) 

790 self.topic = UserTopicAuth(self.db, self.fs, self.msg, self.auth) 

791 self.fake_session = { 

792 "username": test_name, 

793 "project_id": (test_pid,), 

794 "method": None, 

795 "admin": True, 

796 "force": False, 

797 "public": False, 

798 "allow_show_user_project_role": True, 

799 } 

800 self.topic.check_quota = Mock(return_value=None) # skip quota 

801 

802 def test_new_user(self): 

803 uid1 = str(uuid4()) 

804 pid = str(uuid4()) 

805 self.auth.get_user_list.return_value = [] 

806 self.auth.get_project.return_value = {"_id": pid, "name": "some_project"} 

807 self.auth.create_user.return_value = {"_id": uid1, "username": self.test_name} 

808 with self.subTest(i=1): 

809 rollback = [] 

810 rid = str(uuid4()) 

811 self.auth.get_role.return_value = {"_id": rid, "name": "some_role"} 

812 prms_in = [{"project": "some_project", "role": "some_role"}] 

813 prms_out = [{"project": pid, "role": rid}] 

814 uid2, oid = self.topic.new( 

815 rollback, 

816 self.fake_session, 

817 { 

818 "username": self.test_name, 

819 "password": self.password, 

820 "project_role_mappings": prms_in, 

821 }, 

822 ) 

823 self.assertEqual(len(rollback), 1, "Wrong rollback length") 

824 self.assertEqual(uid2, uid1, "Wrong project identifier") 

825 content = self.auth.create_user.call_args[0][0] 

826 self.assertEqual(content["username"], self.test_name, "Wrong project name") 

827 self.assertEqual(content["password"], self.password, "Wrong password") 

828 self.assertEqual( 

829 content["project_role_mappings"], 

830 prms_out, 

831 "Wrong project-role mappings", 

832 ) 

833 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time") 

834 self.assertEqual( 

835 content["_admin"]["modified"], 

836 content["_admin"]["created"], 

837 "Wrong modification time", 

838 ) 

839 with self.subTest(i=2): 

840 rollback = [] 

841 def_rid = str(uuid4()) 

842 def_role = {"_id": def_rid, "name": "project_admin"} 

843 self.auth.get_role.return_value = def_role 

844 self.auth.get_role_list.return_value = [def_role] 

845 prms_out = [{"project": pid, "role": def_rid}] 

846 uid2, oid = self.topic.new( 

847 rollback, 

848 self.fake_session, 

849 { 

850 "username": self.test_name, 

851 "password": self.password, 

852 "projects": ["some_project"], 

853 }, 

854 ) 

855 self.assertEqual(len(rollback), 1, "Wrong rollback length") 

856 self.assertEqual(uid2, uid1, "Wrong project identifier") 

857 content = self.auth.create_user.call_args[0][0] 

858 self.assertEqual(content["username"], self.test_name, "Wrong project name") 

859 self.assertEqual(content["password"], self.password, "Wrong password") 

860 self.assertEqual( 

861 content["project_role_mappings"], 

862 prms_out, 

863 "Wrong project-role mappings", 

864 ) 

865 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time") 

866 self.assertEqual( 

867 content["_admin"]["modified"], 

868 content["_admin"]["created"], 

869 "Wrong modification time", 

870 ) 

871 with self.subTest(i=3): 

872 rollback = [] 

873 with self.assertRaises( 

874 EngineException, msg="Accepted wrong project-role mappings" 

875 ) as e: 

876 self.topic.new( 

877 rollback, 

878 self.fake_session, 

879 { 

880 "username": "other-project-name", 

881 "password": "Other@pwd1", 

882 "project_role_mappings": [{}], 

883 }, 

884 ) 

885 self.assertEqual(len(rollback), 0, "Wrong rollback length") 

886 self.assertEqual( 

887 e.exception.http_code, 

888 HTTPStatus.UNPROCESSABLE_ENTITY, 

889 "Wrong HTTP status code", 

890 ) 

891 self.assertIn( 

892 "format error at '{}' '{}'".format( 

893 "project_role_mappings:{}", "'{}' is a required property" 

894 ).format(0, "project"), 

895 norm(str(e.exception)), 

896 "Wrong exception text", 

897 ) 

898 with self.subTest(i=4): 

899 rollback = [] 

900 with self.assertRaises(EngineException, msg="Accepted wrong projects") as e: 

901 self.topic.new( 

902 rollback, 

903 self.fake_session, 

904 { 

905 "username": "other-project-name", 

906 "password": "Other@pwd1", 

907 "projects": [], 

908 }, 

909 ) 

910 self.assertEqual(len(rollback), 0, "Wrong rollback length") 

911 self.assertEqual( 

912 e.exception.http_code, 

913 HTTPStatus.UNPROCESSABLE_ENTITY, 

914 "Wrong HTTP status code", 

915 ) 

916 self.assertIn( 

917 "format error at '{}' '{}'".format( 

918 "projects", "{} is too short" 

919 ).format([]), 

920 norm(str(e.exception)), 

921 "Wrong exception text", 

922 ) 

923 

924 def test_edit_user(self): 

925 now = time() 

926 uid = str(uuid4()) 

927 pid1 = str(uuid4()) 

928 rid1 = str(uuid4()) 

929 prms = [ 

930 { 

931 "project": pid1, 

932 "project_name": "project-1", 

933 "role": rid1, 

934 "role_name": "role-1", 

935 } 

936 ] 

937 user = { 

938 "_id": uid, 

939 "username": self.test_name, 

940 "project_role_mappings": prms, 

941 "_admin": {"created": now, "modified": now}, 

942 } 

943 with self.subTest(i=1): 

944 self.auth.get_user_list.side_effect = [[user], []] 

945 self.auth.get_user.return_value = user 

946 pid2 = str(uuid4()) 

947 rid2 = str(uuid4()) 

948 self.auth.get_project.side_effect = [ 

949 {"_id": pid2, "name": "project-2"}, 

950 {"_id": pid1, "name": "project-1"}, 

951 ] 

952 self.auth.get_role.side_effect = [ 

953 {"_id": rid2, "name": "role-2"}, 

954 {"_id": rid1, "name": "role-1"}, 

955 ] 

956 new_name = "new-user-name" 

957 new_pasw = "New@pwd1" 

958 add_prms = [{"project": pid2, "role": rid2}] 

959 rem_prms = [{"project": pid1, "role": rid1}] 

960 self.topic.edit( 

961 self.fake_session, 

962 uid, 

963 { 

964 "username": new_name, 

965 "password": new_pasw, 

966 "add_project_role_mappings": add_prms, 

967 "remove_project_role_mappings": rem_prms, 

968 }, 

969 ) 

970 content = self.auth.update_user.call_args[0][0] 

971 self.assertEqual(content["_id"], uid, "Wrong user identifier") 

972 self.assertEqual(content["username"], new_name, "Wrong user name") 

973 self.assertEqual(content["password"], new_pasw, "Wrong user password") 

974 self.assertEqual( 

975 content["add_project_role_mappings"], 

976 add_prms, 

977 "Wrong project-role mappings to add", 

978 ) 

979 self.assertEqual( 

980 content["remove_project_role_mappings"], 

981 prms, 

982 "Wrong project-role mappings to remove", 

983 ) 

984 with self.subTest(i=2): 

985 new_name = "other-user-name" 

986 new_prms = [{}] 

987 self.auth.get_role_list.side_effect = [[user], []] 

988 self.auth.get_user_list.side_effect = [[user]] 

989 with self.assertRaises( 

990 EngineException, msg="Accepted wrong project-role mappings" 

991 ) as e: 

992 self.topic.edit( 

993 self.fake_session, 

994 uid, 

995 {"username": new_name, "project_role_mappings": new_prms}, 

996 ) 

997 self.assertEqual( 

998 e.exception.http_code, 

999 HTTPStatus.UNPROCESSABLE_ENTITY, 

1000 "Wrong HTTP status code", 

1001 ) 

1002 self.assertIn( 

1003 "format error at '{}' '{}'".format( 

1004 "project_role_mappings:{}", "'{}' is a required property" 

1005 ).format(0, "project"), 

1006 norm(str(e.exception)), 

1007 "Wrong exception text", 

1008 ) 

1009 with self.subTest(i=3): 

1010 self.auth.get_user_list.side_effect = [[user], []] 

1011 self.auth.get_user.return_value = user 

1012 old_password = self.password 

1013 new_pasw = "New@pwd1" 

1014 self.topic.edit( 

1015 self.fake_session, 

1016 uid, 

1017 { 

1018 "old_password": old_password, 

1019 "password": new_pasw, 

1020 }, 

1021 ) 

1022 content = self.auth.update_user.call_args[0][0] 

1023 self.assertEqual( 

1024 content["old_password"], old_password, "Wrong old password" 

1025 ) 

1026 self.assertEqual(content["password"], new_pasw, "Wrong user password") 

1027 

1028 def test_delete_user(self): 

1029 with self.subTest(i=1): 

1030 uid = str(uuid4()) 

1031 self.fake_session["username"] = self.test_name 

1032 user = user = { 

1033 "_id": uid, 

1034 "username": "other-user-name", 

1035 "project_role_mappings": [], 

1036 } 

1037 self.auth.get_user.return_value = user 

1038 self.auth.delete_user.return_value = {"deleted": 1} 

1039 rc = self.topic.delete(self.fake_session, uid) 

1040 self.assertEqual(rc, {"deleted": 1}, "Wrong user deletion return info") 

1041 self.assertEqual( 

1042 self.auth.get_user.call_args[0][0], uid, "Wrong user identifier" 

1043 ) 

1044 self.assertEqual( 

1045 self.auth.delete_user.call_args[0][0], uid, "Wrong user identifier" 

1046 ) 

1047 

1048 def test_conflict_on_new(self): 

1049 with self.subTest(i=1): 

1050 rollback = [] 

1051 uid = str(uuid4()) 

1052 with self.assertRaises( 

1053 EngineException, msg="Accepted uuid as username" 

1054 ) as e: 

1055 self.topic.new( 

1056 rollback, 

1057 self.fake_session, 

1058 { 

1059 "username": uid, 

1060 "password": self.password, 

1061 "projects": [test_pid], 

1062 }, 

1063 ) 

1064 self.assertEqual(len(rollback), 0, "Wrong rollback length") 

1065 self.assertEqual( 

1066 e.exception.http_code, 

1067 HTTPStatus.UNPROCESSABLE_ENTITY, 

1068 "Wrong HTTP status code", 

1069 ) 

1070 self.assertIn( 

1071 "username '{}' cannot have a uuid format".format(uid), 

1072 norm(str(e.exception)), 

1073 "Wrong exception text", 

1074 ) 

1075 with self.subTest(i=2): 

1076 rollback = [] 

1077 self.auth.get_user_list.return_value = [ 

1078 {"_id": str(uuid4()), "username": self.test_name} 

1079 ] 

1080 with self.assertRaises( 

1081 EngineException, msg="Accepted existing username" 

1082 ) as e: 

1083 self.topic.new( 

1084 rollback, 

1085 self.fake_session, 

1086 { 

1087 "username": self.test_name, 

1088 "password": self.password, 

1089 "projects": [test_pid], 

1090 }, 

1091 ) 

1092 self.assertEqual(len(rollback), 0, "Wrong rollback length") 

1093 self.assertEqual( 

1094 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code" 

1095 ) 

1096 self.assertIn( 

1097 "username '{}' is already used".format(self.test_name), 

1098 norm(str(e.exception)), 

1099 "Wrong exception text", 

1100 ) 

1101 with self.subTest(i=3): 

1102 rollback = [] 

1103 self.auth.get_user_list.return_value = [] 

1104 self.auth.get_role_list.side_effect = [[], []] 

1105 with self.assertRaises( 

1106 AuthconnNotFoundException, msg="Accepted user without default role" 

1107 ) as e: 

1108 self.topic.new( 

1109 rollback, 

1110 self.fake_session, 

1111 { 

1112 "username": self.test_name, 

1113 "password": self.password, 

1114 "projects": [str(uuid4())], 

1115 }, 

1116 ) 

1117 self.assertEqual(len(rollback), 0, "Wrong rollback length") 

1118 self.assertEqual( 

1119 e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code" 

1120 ) 

1121 self.assertIn( 

1122 "can't find default role for user '{}'".format(self.test_name), 

1123 norm(str(e.exception)), 

1124 "Wrong exception text", 

1125 ) 

1126 

1127 def test_conflict_on_edit(self): 

1128 uid = str(uuid4()) 

1129 with self.subTest(i=1): 

1130 self.auth.get_user_list.return_value = [ 

1131 {"_id": uid, "username": self.test_name} 

1132 ] 

1133 new_name = str(uuid4()) 

1134 with self.assertRaises( 

1135 EngineException, msg="Accepted uuid as username" 

1136 ) as e: 

1137 self.topic.edit(self.fake_session, uid, {"username": new_name}) 

1138 self.assertEqual( 

1139 e.exception.http_code, 

1140 HTTPStatus.UNPROCESSABLE_ENTITY, 

1141 "Wrong HTTP status code", 

1142 ) 

1143 self.assertIn( 

1144 "username '{}' cannot have an uuid format".format(new_name), 

1145 norm(str(e.exception)), 

1146 "Wrong exception text", 

1147 ) 

1148 with self.subTest(i=2): 

1149 self.auth.get_user_list.return_value = [ 

1150 {"_id": uid, "username": self.test_name} 

1151 ] 

1152 self.auth.get_role_list.side_effect = [[], []] 

1153 with self.assertRaises( 

1154 AuthconnNotFoundException, msg="Accepted user without default role" 

1155 ) as e: 

1156 self.topic.edit(self.fake_session, uid, {"projects": [str(uuid4())]}) 

1157 self.assertEqual( 

1158 e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code" 

1159 ) 

1160 self.assertIn( 

1161 "can't find a default role for user '{}'".format(self.test_name), 

1162 norm(str(e.exception)), 

1163 "Wrong exception text", 

1164 ) 

1165 with self.subTest(i=3): 

1166 admin_uid = str(uuid4()) 

1167 self.auth.get_user_list.return_value = [ 

1168 {"_id": admin_uid, "username": "admin"} 

1169 ] 

1170 with self.assertRaises( 

1171 EngineException, 

1172 msg="Accepted removing system_admin role from admin user", 

1173 ) as e: 

1174 self.topic.edit( 

1175 self.fake_session, 

1176 admin_uid, 

1177 { 

1178 "remove_project_role_mappings": [ 

1179 {"project": "admin", "role": "system_admin"} 

1180 ] 

1181 }, 

1182 ) 

1183 self.assertEqual( 

1184 e.exception.http_code, HTTPStatus.FORBIDDEN, "Wrong HTTP status code" 

1185 ) 

1186 self.assertIn( 

1187 "you cannot remove system_admin role from admin user", 

1188 norm(str(e.exception)), 

1189 "Wrong exception text", 

1190 ) 

1191 with self.subTest(i=4): 

1192 new_name = "new-user-name" 

1193 self.auth.get_user_list.side_effect = [ 

1194 [{"_id": uid, "name": self.test_name}], 

1195 [{"_id": str(uuid4()), "name": new_name}], 

1196 ] 

1197 with self.assertRaises( 

1198 EngineException, msg="Accepted existing username" 

1199 ) as e: 

1200 self.topic.edit(self.fake_session, uid, {"username": new_name}) 

1201 self.assertEqual( 

1202 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code" 

1203 ) 

1204 self.assertIn( 

1205 "username '{}' is already used".format(new_name), 

1206 norm(str(e.exception)), 

1207 "Wrong exception text", 

1208 ) 

1209 

1210 def test_conflict_on_del(self): 

1211 with self.subTest(i=1): 

1212 uid = str(uuid4()) 

1213 self.fake_session["username"] = self.test_name 

1214 user = user = { 

1215 "_id": uid, 

1216 "username": self.test_name, 

1217 "project_role_mappings": [], 

1218 } 

1219 self.auth.get_user.return_value = user 

1220 with self.assertRaises( 

1221 EngineException, msg="Accepted deletion of own user" 

1222 ) as e: 

1223 self.topic.delete(self.fake_session, uid) 

1224 self.assertEqual( 

1225 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code" 

1226 ) 

1227 self.assertIn( 

1228 "you cannot delete your own login user", 

1229 norm(str(e.exception)), 

1230 "Wrong exception text", 

1231 ) 

1232 

1233 def test_user_management(self): 

1234 self.config = { 

1235 "user_management": True, 

1236 "pwd_expire_days": 30, 

1237 "max_pwd_attempt": 5, 

1238 "account_expire_days": 90, 

1239 "version": "dev", 

1240 "deviceVendor": "test", 

1241 "deviceProduct": "test", 

1242 } 

1243 self.permissions = {"admin": True, "default": True} 

1244 now = time() 

1245 rid = str(uuid4()) 

1246 role = { 

1247 "_id": rid, 

1248 "name": self.test_name, 

1249 "permissions": self.permissions, 

1250 "_admin": {"created": now, "modified": now}, 

1251 } 

1252 self.db.create("roles", role) 

1253 admin_user = { 

1254 "_id": "72cd0cd6-e8e2-482c-9bc2-15b413bb8500", 

1255 "username": "admin", 

1256 "password": "bf0d9f988ad9b404464cf8c8749b298209b05fd404119bae0c11e247efbbc4cb", 

1257 "_admin": { 

1258 "created": 1663058370.7721832, 

1259 "modified": 1663681183.5651639, 

1260 "salt": "37587e7e0c2f4dbfb9416f3fb5543e2b", 

1261 "last_token_time": 1666876472.2962265, 

1262 "user_status": "always-active", 

1263 "retry_count": 0, 

1264 }, 

1265 "project_role_mappings": [ 

1266 {"project": "a595ce4e-09dc-4b24-9d6f-e723830bc66b", "role": rid} 

1267 ], 

1268 } 

1269 self.db.create("users", admin_user) 

1270 with self.subTest(i=1): 

1271 self.user_create = AuthconnInternal(self.config, self.db, self.permissions) 

1272 user_info = {"username": "user_mgmt_true", "password": "Test@123"} 

1273 self.user_create.create_user(user_info) 

1274 user = self.db.get_one("users", {"username": user_info["username"]}) 

1275 self.assertEqual(user["username"], user_info["username"], "Wrong user name") 

1276 self.assertEqual( 

1277 user["_admin"]["user_status"], "active", "User status is unknown" 

1278 ) 

1279 self.assertIn("password_expire_time", user["_admin"], "Key is not there") 

1280 self.assertIn("account_expire_time", user["_admin"], "Key is not there") 

1281 with self.subTest(i=2): 

1282 self.user_update = AuthconnInternal(self.config, self.db, self.permissions) 

1283 locked_user = { 

1284 "username": "user_lock", 

1285 "password": "c94ba8cfe81985cf5c84dff16d5bac95814ab17e44a8871755eb4cf3a27b7d3d", 

1286 "_admin": { 

1287 "created": 1667207552.2191198, 

1288 "modified": 1667207552.2191815, 

1289 "salt": "560a5d51b1d64bb4b9cae0ccff3f1102", 

1290 "user_status": "locked", 

1291 "password_expire_time": 1667207552.2191815, 

1292 "account_expire_time": 1674983552.2191815, 

1293 "retry_count": 5, 

1294 "last_token_time": 1667207552.2191815, 

1295 }, 

1296 "_id": "73bbbb71-ed38-4b79-9f58-ece19e7e32d6", 

1297 } 

1298 self.db.create("users", locked_user) 

1299 user_info = { 

1300 "_id": "73bbbb71-ed38-4b79-9f58-ece19e7e32d6", 

1301 "system_admin_id": "72cd0cd6-e8e2-482c-9bc2-15b413bb8500", 

1302 "unlock": True, 

1303 } 

1304 self.assertEqual( 

1305 locked_user["_admin"]["user_status"], "locked", "User status is unknown" 

1306 ) 

1307 self.user_update.update_user(user_info) 

1308 user = self.db.get_one("users", {"username": locked_user["username"]}) 

1309 self.assertEqual( 

1310 user["username"], locked_user["username"], "Wrong user name" 

1311 ) 

1312 self.assertEqual( 

1313 user["_admin"]["user_status"], "active", "User status is unknown" 

1314 ) 

1315 self.assertEqual(user["_admin"]["retry_count"], 0, "retry_count is unknown") 

1316 with self.subTest(i=3): 

1317 self.user_update = AuthconnInternal(self.config, self.db, self.permissions) 

1318 expired_user = { 

1319 "username": "user_expire", 

1320 "password": "c94ba8cfe81985cf5c84dff16d5bac95814ab17e44a8871755eb4cf3a27b7d3d", 

1321 "_admin": { 

1322 "created": 1665602087.601298, 

1323 "modified": 1665636442.1245084, 

1324 "salt": "560a5d51b1d64bb4b9cae0ccff3f1102", 

1325 "user_status": "expired", 

1326 "password_expire_time": 1668248628.2191815, 

1327 "account_expire_time": 1666952628.2191815, 

1328 "retry_count": 0, 

1329 "last_token_time": 1666779828.2171815, 

1330 }, 

1331 "_id": "3266430f-8222-407f-b08f-3a242504ab94", 

1332 } 

1333 self.db.create("users", expired_user) 

1334 user_info = { 

1335 "_id": "3266430f-8222-407f-b08f-3a242504ab94", 

1336 "system_admin_id": "72cd0cd6-e8e2-482c-9bc2-15b413bb8500", 

1337 "renew": True, 

1338 } 

1339 self.assertEqual( 

1340 expired_user["_admin"]["user_status"], 

1341 "expired", 

1342 "User status is unknown", 

1343 ) 

1344 self.user_update.update_user(user_info) 

1345 user = self.db.get_one("users", {"username": expired_user["username"]}) 

1346 self.assertEqual( 

1347 user["username"], expired_user["username"], "Wrong user name" 

1348 ) 

1349 self.assertEqual( 

1350 user["_admin"]["user_status"], "active", "User status is unknown" 

1351 ) 

1352 self.assertGreater( 

1353 user["_admin"]["account_expire_time"], 

1354 expired_user["_admin"]["account_expire_time"], 

1355 "User expire time is not get extended", 

1356 ) 

1357 with self.subTest(i=4): 

1358 self.config.update({"user_management": False}) 

1359 self.user_create = AuthconnInternal(self.config, self.db, self.permissions) 

1360 user_info = {"username": "user_mgmt_false", "password": "Test@123"} 

1361 self.user_create.create_user(user_info) 

1362 user = self.db.get_one("users", {"username": user_info["username"]}) 

1363 self.assertEqual(user["username"], user_info["username"], "Wrong user name") 

1364 self.assertEqual( 

1365 user["_admin"]["user_status"], "active", "User status is unknown" 

1366 ) 

1367 self.assertNotIn("password_expire_time", user["_admin"], "Key is not there") 

1368 self.assertNotIn("account_expire_time", user["_admin"], "Key is not there") 

1369 

1370 

1371class Test_CommonVimWimSdn(TestCase): 

1372 @classmethod 

1373 def setUpClass(cls): 

1374 cls.test_name = "test-cim-topic" # CIM = Common Infrastructure Manager 

1375 

1376 def setUp(self): 

1377 self.db = Mock(dbbase.DbBase()) 

1378 self.fs = Mock(fsbase.FsBase()) 

1379 self.msg = Mock(msgbase.MsgBase()) 

1380 self.auth = Mock(authconn.Authconn(None, None, None)) 

1381 self.topic = CommonVimWimSdn(self.db, self.fs, self.msg, self.auth) 

1382 # Use WIM schemas for testing because they are the simplest 

1383 self.topic._send_msg = Mock() 

1384 self.topic.topic = "wims" 

1385 self.topic.schema_new = validation.wim_account_new_schema 

1386 self.topic.schema_edit = validation.wim_account_edit_schema 

1387 self.fake_session = { 

1388 "username": test_name, 

1389 "project_id": (test_pid,), 

1390 "method": None, 

1391 "admin": True, 

1392 "force": False, 

1393 "public": False, 

1394 "allow_show_user_project_role": True, 

1395 } 

1396 self.topic.check_quota = Mock(return_value=None) # skip quota 

1397 

1398 def test_new_cvws(self): 

1399 test_url = "http://0.0.0.0:0" 

1400 with self.subTest(i=1): 

1401 rollback = [] 

1402 test_type = "fake" 

1403 self.db.get_one.return_value = None 

1404 self.db.create.side_effect = lambda self, content: content["_id"] 

1405 cid, oid = self.topic.new( 

1406 rollback, 

1407 self.fake_session, 

1408 {"name": self.test_name, "wim_url": test_url, "wim_type": test_type}, 

1409 ) 

1410 self.assertEqual(len(rollback), 1, "Wrong rollback length") 

1411 args = self.db.create.call_args[0] 

1412 content = args[1] 

1413 self.assertEqual(args[0], self.topic.topic, "Wrong topic") 

1414 self.assertEqual(content["_id"], cid, "Wrong CIM identifier") 

1415 self.assertEqual(content["name"], self.test_name, "Wrong CIM name") 

1416 self.assertEqual(content["wim_url"], test_url, "Wrong URL") 

1417 self.assertEqual(content["wim_type"], test_type, "Wrong CIM type") 

1418 self.assertEqual(content["schema_version"], "1.11", "Wrong schema version") 

1419 self.assertEqual(content["op_id"], oid, "Wrong operation identifier") 

1420 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time") 

1421 self.assertEqual( 

1422 content["_admin"]["modified"], 

1423 content["_admin"]["created"], 

1424 "Wrong modification time", 

1425 ) 

1426 self.assertEqual( 

1427 content["_admin"]["operationalState"], 

1428 "PROCESSING", 

1429 "Wrong operational state", 

1430 ) 

1431 self.assertEqual( 

1432 content["_admin"]["projects_read"], 

1433 [test_pid], 

1434 "Wrong read-only projects", 

1435 ) 

1436 self.assertEqual( 

1437 content["_admin"]["projects_write"], 

1438 [test_pid], 

1439 "Wrong read/write projects", 

1440 ) 

1441 self.assertIsNone( 

1442 content["_admin"]["current_operation"], "Wrong current operation" 

1443 ) 

1444 self.assertEqual( 

1445 len(content["_admin"]["operations"]), 1, "Wrong number of operations" 

1446 ) 

1447 operation = content["_admin"]["operations"][0] 

1448 self.assertEqual( 

1449 operation["lcmOperationType"], "create", "Wrong operation type" 

1450 ) 

1451 self.assertEqual( 

1452 operation["operationState"], "PROCESSING", "Wrong operation state" 

1453 ) 

1454 self.assertGreater( 

1455 operation["startTime"], 

1456 content["_admin"]["created"], 

1457 "Wrong operation start time", 

1458 ) 

1459 self.assertGreater( 

1460 operation["statusEnteredTime"], 

1461 content["_admin"]["created"], 

1462 "Wrong operation status enter time", 

1463 ) 

1464 self.assertEqual( 

1465 operation["detailed-status"], "", "Wrong operation detailed status info" 

1466 ) 

1467 self.assertIsNone( 

1468 operation["operationParams"], "Wrong operation parameters" 

1469 ) 

1470 # This test is disabled. From Feature 8030 we admit all WIM/SDN types 

1471 # with self.subTest(i=2): 

1472 # rollback = [] 

1473 # test_type = "bad_type" 

1474 # with self.assertRaises(EngineException, msg="Accepted wrong CIM type") as e: 

1475 # self.topic.new(rollback, self.fake_session, 

1476 # {"name": self.test_name, "wim_url": test_url, "wim_type": test_type}) 

1477 # self.assertEqual(len(rollback), 0, "Wrong rollback length") 

1478 # self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code") 

1479 # self.assertIn("format error at '{}' '{}".format("wim_type", "'{}' is not one of {}").format(test_type,""), 

1480 # norm(str(e.exception)), "Wrong exception text") 

1481 

1482 def test_conflict_on_new(self): 

1483 with self.subTest(i=1): 

1484 rollback = [] 

1485 test_url = "http://0.0.0.0:0" 

1486 test_type = "fake" 

1487 self.db.get_one.return_value = {"_id": str(uuid4()), "name": self.test_name} 

1488 with self.assertRaises( 

1489 EngineException, msg="Accepted existing CIM name" 

1490 ) as e: 

1491 self.topic.new( 

1492 rollback, 

1493 self.fake_session, 

1494 { 

1495 "name": self.test_name, 

1496 "wim_url": test_url, 

1497 "wim_type": test_type, 

1498 }, 

1499 ) 

1500 self.assertEqual(len(rollback), 0, "Wrong rollback length") 

1501 self.assertEqual( 

1502 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code" 

1503 ) 

1504 self.assertIn( 

1505 "name '{}' already exists for {}".format( 

1506 self.test_name, self.topic.topic 

1507 ), 

1508 norm(str(e.exception)), 

1509 "Wrong exception text", 

1510 ) 

1511 

1512 def test_edit_cvws(self): 

1513 now = time() 

1514 cid = str(uuid4()) 

1515 test_url = "http://0.0.0.0:0" 

1516 test_type = "fake" 

1517 cvws = { 

1518 "_id": cid, 

1519 "name": self.test_name, 

1520 "wim_url": test_url, 

1521 "wim_type": test_type, 

1522 "_admin": { 

1523 "created": now, 

1524 "modified": now, 

1525 "operations": [{"lcmOperationType": "create"}], 

1526 }, 

1527 } 

1528 with self.subTest(i=1): 

1529 new_name = "new-cim-name" 

1530 new_url = "https://1.1.1.1:1" 

1531 new_type = "onos" 

1532 self.db.get_one.side_effect = [cvws, None] 

1533 self.db.replace.return_value = {"updated": 1} 

1534 # self.db.encrypt.side_effect = [b64str(), b64str()] 

1535 self.topic.edit( 

1536 self.fake_session, 

1537 cid, 

1538 {"name": new_name, "wim_url": new_url, "wim_type": new_type}, 

1539 ) 

1540 args = self.db.replace.call_args[0] 

1541 content = args[2] 

1542 self.assertEqual(args[0], self.topic.topic, "Wrong topic") 

1543 self.assertEqual(args[1], cid, "Wrong CIM identifier") 

1544 self.assertEqual(content["_id"], cid, "Wrong CIM identifier") 

1545 self.assertEqual(content["name"], new_name, "Wrong CIM name") 

1546 self.assertEqual(content["wim_type"], new_type, "Wrong CIM type") 

1547 self.assertEqual(content["wim_url"], new_url, "Wrong URL") 

1548 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time") 

1549 self.assertGreater( 

1550 content["_admin"]["modified"], 

1551 content["_admin"]["created"], 

1552 "Wrong modification time", 

1553 ) 

1554 self.assertEqual( 

1555 len(content["_admin"]["operations"]), 2, "Wrong number of operations" 

1556 ) 

1557 operation = content["_admin"]["operations"][1] 

1558 self.assertEqual( 

1559 operation["lcmOperationType"], "edit", "Wrong operation type" 

1560 ) 

1561 self.assertEqual( 

1562 operation["operationState"], "PROCESSING", "Wrong operation state" 

1563 ) 

1564 self.assertGreater( 

1565 operation["startTime"], 

1566 content["_admin"]["modified"], 

1567 "Wrong operation start time", 

1568 ) 

1569 self.assertGreater( 

1570 operation["statusEnteredTime"], 

1571 content["_admin"]["modified"], 

1572 "Wrong operation status enter time", 

1573 ) 

1574 self.assertEqual( 

1575 operation["detailed-status"], "", "Wrong operation detailed status info" 

1576 ) 

1577 self.assertIsNone( 

1578 operation["operationParams"], "Wrong operation parameters" 

1579 ) 

1580 with self.subTest(i=2): 

1581 self.db.get_one.side_effect = [cvws] 

1582 with self.assertRaises(EngineException, msg="Accepted wrong property") as e: 

1583 self.topic.edit( 

1584 self.fake_session, 

1585 str(uuid4()), 

1586 {"name": "new-name", "extra_prop": "anything"}, 

1587 ) 

1588 self.assertEqual( 

1589 e.exception.http_code, 

1590 HTTPStatus.UNPROCESSABLE_ENTITY, 

1591 "Wrong HTTP status code", 

1592 ) 

1593 self.assertIn( 

1594 "format error '{}'".format( 

1595 "additional properties are not allowed ('{}' was unexpected)" 

1596 ).format("extra_prop"), 

1597 norm(str(e.exception)), 

1598 "Wrong exception text", 

1599 ) 

1600 

1601 def test_conflict_on_edit(self): 

1602 with self.subTest(i=1): 

1603 cid = str(uuid4()) 

1604 new_name = "new-cim-name" 

1605 self.db.get_one.side_effect = [ 

1606 {"_id": cid, "name": self.test_name}, 

1607 {"_id": str(uuid4()), "name": new_name}, 

1608 ] 

1609 with self.assertRaises( 

1610 EngineException, msg="Accepted existing CIM name" 

1611 ) as e: 

1612 self.topic.edit(self.fake_session, cid, {"name": new_name}) 

1613 self.assertEqual( 

1614 e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code" 

1615 ) 

1616 self.assertIn( 

1617 "name '{}' already exists for {}".format(new_name, self.topic.topic), 

1618 norm(str(e.exception)), 

1619 "Wrong exception text", 

1620 ) 

1621 

1622 def test_delete_cvws(self): 

1623 cid = str(uuid4()) 

1624 ro_pid = str(uuid4()) 

1625 rw_pid = str(uuid4()) 

1626 cvws = {"_id": cid, "name": self.test_name} 

1627 self.db.get_list.return_value = [] 

1628 with self.subTest(i=1): 

1629 cvws["_admin"] = { 

1630 "projects_read": [test_pid, ro_pid, rw_pid], 

1631 "projects_write": [test_pid, rw_pid], 

1632 } 

1633 self.db.get_one.return_value = cvws 

1634 oid = self.topic.delete(self.fake_session, cid) 

1635 self.assertIsNone(oid, "Wrong operation identifier") 

1636 self.assertEqual( 

1637 self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic" 

1638 ) 

1639 self.assertEqual( 

1640 self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier" 

1641 ) 

1642 self.assertEqual( 

1643 self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic" 

1644 ) 

1645 self.assertEqual( 

1646 self.db.set_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier" 

1647 ) 

1648 self.assertEqual( 

1649 self.db.set_one.call_args[1]["update_dict"], 

1650 None, 

1651 "Wrong read-only projects update", 

1652 ) 

1653 self.assertEqual( 

1654 self.db.set_one.call_args[1]["pull_list"], 

1655 { 

1656 "_admin.projects_read": (test_pid,), 

1657 "_admin.projects_write": (test_pid,), 

1658 }, 

1659 "Wrong read/write projects update", 

1660 ) 

1661 self.topic._send_msg.assert_not_called() 

1662 with self.subTest(i=2): 

1663 now = time() 

1664 cvws["_admin"] = { 

1665 "projects_read": [test_pid], 

1666 "projects_write": [test_pid], 

1667 "operations": [], 

1668 } 

1669 self.db.get_one.return_value = cvws 

1670 oid = self.topic.delete(self.fake_session, cid) 

1671 self.assertEqual(oid, cid + ":0", "Wrong operation identifier") 

1672 self.assertEqual( 

1673 self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic" 

1674 ) 

1675 self.assertEqual( 

1676 self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier" 

1677 ) 

1678 self.assertEqual( 

1679 self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic" 

1680 ) 

1681 self.assertEqual( 

1682 self.db.set_one.call_args[0][1]["_id"], cid, "Wrong user identifier" 

1683 ) 

1684 self.assertEqual( 

1685 self.db.set_one.call_args[1]["update_dict"], 

1686 {"_admin.to_delete": True}, 

1687 "Wrong _admin.to_delete update", 

1688 ) 

1689 operation = self.db.set_one.call_args[1]["push"]["_admin.operations"] 

1690 self.assertEqual( 

1691 operation["lcmOperationType"], "delete", "Wrong operation type" 

1692 ) 

1693 self.assertEqual( 

1694 operation["operationState"], "PROCESSING", "Wrong operation state" 

1695 ) 

1696 self.assertEqual( 

1697 operation["detailed-status"], "", "Wrong operation detailed status" 

1698 ) 

1699 self.assertIsNone( 

1700 operation["operationParams"], "Wrong operation parameters" 

1701 ) 

1702 self.assertGreater( 

1703 operation["startTime"], now, "Wrong operation start time" 

1704 ) 

1705 self.assertGreater( 

1706 operation["statusEnteredTime"], now, "Wrong operation status enter time" 

1707 ) 

1708 self.topic._send_msg.assert_called_once_with( 

1709 "delete", {"_id": cid, "op_id": cid + ":0"}, not_send_msg=None 

1710 ) 

1711 with self.subTest(i=3): 

1712 cvws["_admin"] = { 

1713 "projects_read": [], 

1714 "projects_write": [], 

1715 "operations": [], 

1716 } 

1717 self.db.get_one.return_value = cvws 

1718 self.topic._send_msg.reset_mock() 

1719 self.db.get_one.reset_mock() 

1720 self.db.del_one.reset_mock() 

1721 self.fake_session["force"] = True # to force deletion 

1722 self.fake_session["admin"] = True # to force deletion 

1723 self.fake_session["project_id"] = [] # to force deletion 

1724 oid = self.topic.delete(self.fake_session, cid) 

1725 self.assertIsNone(oid, "Wrong operation identifier") 

1726 self.assertEqual( 

1727 self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic" 

1728 ) 

1729 self.assertEqual( 

1730 self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier" 

1731 ) 

1732 self.assertEqual( 

1733 self.db.del_one.call_args[0][0], self.topic.topic, "Wrong topic" 

1734 ) 

1735 self.assertEqual( 

1736 self.db.del_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier" 

1737 ) 

1738 self.topic._send_msg.assert_called_once_with( 

1739 "deleted", {"_id": cid, "op_id": None}, not_send_msg=None 

1740 ) 

1741 

1742 

1743if __name__ == "__main__": 

1744 unittest.main()