2 # -*- coding: utf-8 -*-
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
17 __author__
= "Pedro de la Cruz Ramos, pedro.delacruzramos@altran.com"
18 __date__
= "$2019-10-019"
22 from unittest
import TestCase
23 from unittest
.mock
import Mock
, patch
, call
24 from uuid
import uuid4
25 from http
import HTTPStatus
27 from osm_common
import dbbase
, fsbase
, msgbase
28 from osm_common
.dbmemory
import DbMemory
29 from osm_nbi
import authconn
, validation
30 from osm_nbi
.admin_topics
import (
37 from osm_nbi
.engine
import EngineException
38 from osm_nbi
.authconn
import AuthconnNotFoundException
39 from osm_nbi
.authconn_internal
import AuthconnInternal
42 test_pid
= str(uuid4())
43 test_name
= "test-user"
47 """Normalize string for checking"""
48 return " ".join(str.strip().split()).lower()
51 class TestVcaTopic(TestCase
):
53 self
.db
= Mock(dbbase
.DbBase())
54 self
.fs
= Mock(fsbase
.FsBase())
55 self
.msg
= Mock(msgbase
.MsgBase())
56 self
.auth
= Mock(authconn
.Authconn(None, None, None))
57 self
.vca_topic
= VcaTopic(self
.db
, self
.fs
, self
.msg
, self
.auth
)
59 @patch("osm_nbi.admin_topics.CommonVimWimSdn.format_on_new")
60 def test_format_on_new(self
, mock_super_format_on_new
):
63 "secret": "encrypted_secret",
64 "cacert": "encrypted_cacert",
66 self
.db
.encrypt
.side_effect
= ["secret", "cacert"]
67 mock_super_format_on_new
.return_value
= "1234"
69 oid
= self
.vca_topic
.format_on_new(content
)
71 self
.assertEqual(oid
, "1234")
72 self
.assertEqual(content
["secret"], "secret")
73 self
.assertEqual(content
["cacert"], "cacert")
74 self
.db
.encrypt
.assert_has_calls(
76 call("encrypted_secret", schema_version
="1.11", salt
="id"),
77 call("encrypted_cacert", schema_version
="1.11", salt
="id"),
80 mock_super_format_on_new
.assert_called_with(content
, None, False)
82 @patch("osm_nbi.admin_topics.CommonVimWimSdn.format_on_edit")
83 def test_format_on_edit(self
, mock_super_format_on_edit
):
86 "secret": "encrypted_secret",
87 "cacert": "encrypted_cacert",
91 "schema_version": "1.11",
93 self
.db
.encrypt
.side_effect
= ["secret", "cacert"]
94 mock_super_format_on_edit
.return_value
= "1234"
96 oid
= self
.vca_topic
.format_on_edit(final_content
, edit_content
)
98 self
.assertEqual(oid
, "1234")
99 self
.assertEqual(final_content
["secret"], "secret")
100 self
.assertEqual(final_content
["cacert"], "cacert")
101 self
.db
.encrypt
.assert_has_calls(
103 call("encrypted_secret", schema_version
="1.11", salt
="id"),
104 call("encrypted_cacert", schema_version
="1.11", salt
="id"),
107 mock_super_format_on_edit
.assert_called()
109 @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
110 def test_check_conflict_on_del(self
, mock_check_conflict_on_del
):
112 "project_id": "project-id",
118 self
.db
.get_list
.return_value
= None
120 self
.vca_topic
.check_conflict_on_del(session
, _id
, db_content
)
122 self
.db
.get_list
.assert_called_with(
124 {"vca": _id
, "_admin.projects_read.cont": "project-id"},
126 mock_check_conflict_on_del
.assert_called_with(session
, _id
, db_content
)
128 @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
129 def test_check_conflict_on_del_force(self
, mock_check_conflict_on_del
):
131 "project_id": "project-id",
137 self
.vca_topic
.check_conflict_on_del(session
, _id
, db_content
)
139 self
.db
.get_list
.assert_not_called()
140 mock_check_conflict_on_del
.assert_not_called()
142 @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
143 def test_check_conflict_on_del_with_conflict(self
, mock_check_conflict_on_del
):
145 "project_id": "project-id",
151 self
.db
.get_list
.return_value
= {"_id": "vim", "vca": "vca-id"}
153 with self
.assertRaises(EngineException
) as context
:
154 self
.vca_topic
.check_conflict_on_del(session
, _id
, db_content
)
158 "There is at least one VIM account using this vca",
159 http_code
=HTTPStatus
.CONFLICT
,
163 self
.db
.get_list
.assert_called_with(
165 {"vca": _id
, "_admin.projects_read.cont": "project-id"},
167 mock_check_conflict_on_del
.assert_not_called()
170 class Test_ProjectTopicAuth(TestCase
):
173 cls
.test_name
= "test-project-topic"
176 self
.db
= Mock(dbbase
.DbBase())
177 self
.fs
= Mock(fsbase
.FsBase())
178 self
.msg
= Mock(msgbase
.MsgBase())
179 self
.auth
= Mock(authconn
.Authconn(None, None, None))
180 self
.topic
= ProjectTopicAuth(self
.db
, self
.fs
, self
.msg
, self
.auth
)
181 self
.fake_session
= {
182 "username": self
.test_name
,
183 "project_id": (test_pid
,),
188 "allow_show_user_project_role": True,
190 self
.topic
.check_quota
= Mock(return_value
=None) # skip quota
192 def test_new_project(self
):
193 with self
.subTest(i
=1):
196 self
.auth
.get_project_list
.return_value
= []
197 self
.auth
.create_project
.return_value
= pid1
198 pid2
, oid
= self
.topic
.new(
199 rollback
, self
.fake_session
, {"name": self
.test_name
, "quotas": {}}
201 self
.assertEqual(len(rollback
), 1, "Wrong rollback length")
202 self
.assertEqual(pid2
, pid1
, "Wrong project identifier")
203 content
= self
.auth
.create_project
.call_args
[0][0]
204 self
.assertEqual(content
["name"], self
.test_name
, "Wrong project name")
205 self
.assertEqual(content
["quotas"], {}, "Wrong quotas")
206 self
.assertIsNotNone(content
["_admin"]["created"], "Wrong creation time")
208 content
["_admin"]["modified"],
209 content
["_admin"]["created"],
210 "Wrong modification time",
212 with self
.subTest(i
=2):
214 with self
.assertRaises(EngineException
, msg
="Accepted wrong quotas") as e
:
218 {"name": "other-project-name", "quotas": {"baditems": 10}},
220 self
.assertEqual(len(rollback
), 0, "Wrong rollback length")
222 e
.exception
.http_code
,
223 HTTPStatus
.UNPROCESSABLE_ENTITY
,
224 "Wrong HTTP status code",
227 "format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'".format(
230 norm(str(e
.exception
)),
231 "Wrong exception text",
234 def test_edit_project(self
):
239 "name": self
.test_name
,
240 "_admin": {"created": now
, "modified": now
},
242 with self
.subTest(i
=1):
243 self
.auth
.get_project_list
.side_effect
= [[proj
], []]
244 new_name
= "new-project-name"
246 "vnfds": random
.SystemRandom().randint(0, 100),
247 "nsds": random
.SystemRandom().randint(0, 100),
250 self
.fake_session
, pid
, {"name": new_name
, "quotas": quotas
}
252 _id
, content
= self
.auth
.update_project
.call_args
[0]
253 self
.assertEqual(_id
, pid
, "Wrong project identifier")
254 self
.assertEqual(content
["_id"], pid
, "Wrong project identifier")
255 self
.assertEqual(content
["_admin"]["created"], now
, "Wrong creation time")
257 content
["_admin"]["modified"], now
, "Wrong modification time"
259 self
.assertEqual(content
["name"], new_name
, "Wrong project name")
260 self
.assertEqual(content
["quotas"], quotas
, "Wrong quotas")
261 with self
.subTest(i
=2):
262 new_name
= "other-project-name"
263 quotas
= {"baditems": random
.SystemRandom().randint(0, 100)}
264 self
.auth
.get_project_list
.side_effect
= [[proj
], []]
265 with self
.assertRaises(EngineException
, msg
="Accepted wrong quotas") as e
:
267 self
.fake_session
, pid
, {"name": new_name
, "quotas": quotas
}
270 e
.exception
.http_code
,
271 HTTPStatus
.UNPROCESSABLE_ENTITY
,
272 "Wrong HTTP status code",
275 "format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'".format(
278 norm(str(e
.exception
)),
279 "Wrong exception text",
282 def test_conflict_on_new(self
):
283 with self
.subTest(i
=1):
286 with self
.assertRaises(
287 EngineException
, msg
="Accepted uuid as project name"
289 self
.topic
.new(rollback
, self
.fake_session
, {"name": pid
})
290 self
.assertEqual(len(rollback
), 0, "Wrong rollback length")
292 e
.exception
.http_code
,
293 HTTPStatus
.UNPROCESSABLE_ENTITY
,
294 "Wrong HTTP status code",
297 "project name '{}' cannot have an uuid format".format(pid
),
298 norm(str(e
.exception
)),
299 "Wrong exception text",
301 with self
.subTest(i
=2):
303 self
.auth
.get_project_list
.return_value
= [
304 {"_id": test_pid
, "name": self
.test_name
}
306 with self
.assertRaises(
307 EngineException
, msg
="Accepted existing project name"
309 self
.topic
.new(rollback
, self
.fake_session
, {"name": self
.test_name
})
310 self
.assertEqual(len(rollback
), 0, "Wrong rollback length")
312 e
.exception
.http_code
, HTTPStatus
.CONFLICT
, "Wrong HTTP status code"
315 "project '{}' exists".format(self
.test_name
),
316 norm(str(e
.exception
)),
317 "Wrong exception text",
320 def test_conflict_on_edit(self
):
321 with self
.subTest(i
=1):
322 self
.auth
.get_project_list
.return_value
= [
323 {"_id": test_pid
, "name": self
.test_name
}
325 new_name
= str(uuid4())
326 with self
.assertRaises(
327 EngineException
, msg
="Accepted uuid as project name"
329 self
.topic
.edit(self
.fake_session
, test_pid
, {"name": new_name
})
331 e
.exception
.http_code
,
332 HTTPStatus
.UNPROCESSABLE_ENTITY
,
333 "Wrong HTTP status code",
336 "project name '{}' cannot have an uuid format".format(new_name
),
337 norm(str(e
.exception
)),
338 "Wrong exception text",
340 with self
.subTest(i
=2):
342 self
.auth
.get_project_list
.return_value
= [{"_id": pid
, "name": "admin"}]
343 with self
.assertRaises(
344 EngineException
, msg
="Accepted renaming of project 'admin'"
346 self
.topic
.edit(self
.fake_session
, pid
, {"name": "new-name"})
348 e
.exception
.http_code
, HTTPStatus
.CONFLICT
, "Wrong HTTP status code"
351 "you cannot rename project 'admin'",
352 norm(str(e
.exception
)),
353 "Wrong exception text",
355 with self
.subTest(i
=3):
356 new_name
= "new-project-name"
357 self
.auth
.get_project_list
.side_effect
= [
358 [{"_id": test_pid
, "name": self
.test_name
}],
359 [{"_id": str(uuid4()), "name": new_name
}],
361 with self
.assertRaises(
362 EngineException
, msg
="Accepted existing project name"
364 self
.topic
.edit(self
.fake_session
, pid
, {"name": new_name
})
366 e
.exception
.http_code
, HTTPStatus
.CONFLICT
, "Wrong HTTP status code"
369 "project '{}' is already used".format(new_name
),
370 norm(str(e
.exception
)),
371 "Wrong exception text",
374 def test_delete_project(self
):
375 with self
.subTest(i
=1):
377 self
.auth
.get_project
.return_value
= {
379 "name": "other-project-name",
381 self
.auth
.delete_project
.return_value
= {"deleted": 1}
382 self
.auth
.get_user_list
.return_value
= []
383 self
.db
.get_list
.return_value
= []
384 rc
= self
.topic
.delete(self
.fake_session
, pid
)
385 self
.assertEqual(rc
, {"deleted": 1}, "Wrong project deletion return info")
387 self
.auth
.get_project
.call_args
[0][0], pid
, "Wrong project identifier"
390 self
.auth
.delete_project
.call_args
[0][0],
392 "Wrong project identifier",
395 def test_conflict_on_del(self
):
396 with self
.subTest(i
=1):
397 self
.auth
.get_project
.return_value
= {
399 "name": self
.test_name
,
401 with self
.assertRaises(
402 EngineException
, msg
="Accepted deletion of own project"
404 self
.topic
.delete(self
.fake_session
, self
.test_name
)
406 e
.exception
.http_code
, HTTPStatus
.CONFLICT
, "Wrong HTTP status code"
409 "you cannot delete your own project",
410 norm(str(e
.exception
)),
411 "Wrong exception text",
413 with self
.subTest(i
=2):
414 self
.auth
.get_project
.return_value
= {"_id": str(uuid4()), "name": "admin"}
415 with self
.assertRaises(
416 EngineException
, msg
="Accepted deletion of project 'admin'"
418 self
.topic
.delete(self
.fake_session
, "admin")
420 e
.exception
.http_code
, HTTPStatus
.CONFLICT
, "Wrong HTTP status code"
423 "you cannot delete project 'admin'",
424 norm(str(e
.exception
)),
425 "Wrong exception text",
427 with self
.subTest(i
=3):
429 name
= "other-project-name"
430 self
.auth
.get_project
.return_value
= {"_id": pid
, "name": name
}
431 self
.auth
.get_user_list
.return_value
= [
434 "username": self
.test_name
,
435 "project_role_mappings": [{"project": pid
, "role": str(uuid4())}],
438 with self
.assertRaises(
439 EngineException
, msg
="Accepted deletion of used project"
441 self
.topic
.delete(self
.fake_session
, pid
)
443 e
.exception
.http_code
, HTTPStatus
.CONFLICT
, "Wrong HTTP status code"
446 "project '{}' ({}) is being used by user '{}'".format(
447 name
, pid
, self
.test_name
449 norm(str(e
.exception
)),
450 "Wrong exception text",
452 with self
.subTest(i
=4):
453 self
.auth
.get_user_list
.return_value
= []
454 self
.db
.get_list
.return_value
= [
457 "id": self
.test_name
,
458 "_admin": {"projects_read": [pid
], "projects_write": []},
461 with self
.assertRaises(
462 EngineException
, msg
="Accepted deletion of used project"
464 self
.topic
.delete(self
.fake_session
, pid
)
466 e
.exception
.http_code
, HTTPStatus
.CONFLICT
, "Wrong HTTP status code"
469 "project '{}' ({}) is being used by {} '{}'".format(
470 name
, pid
, "vnf descriptor", self
.test_name
472 norm(str(e
.exception
)),
473 "Wrong exception text",
477 class Test_RoleTopicAuth(TestCase
):
480 cls
.test_name
= "test-role-topic"
481 cls
.test_operations
= ["tokens:get"]
484 self
.db
= Mock(dbbase
.DbBase())
485 self
.fs
= Mock(fsbase
.FsBase())
486 self
.msg
= Mock(msgbase
.MsgBase())
487 self
.auth
= Mock(authconn
.Authconn(None, None, None))
488 self
.auth
.role_permissions
= self
.test_operations
489 self
.topic
= RoleTopicAuth(self
.db
, self
.fs
, self
.msg
, self
.auth
)
490 self
.fake_session
= {
491 "username": test_name
,
492 "project_id": (test_pid
,),
497 "allow_show_user_project_role": True,
499 self
.topic
.check_quota
= Mock(return_value
=None) # skip quota
501 def test_new_role(self
):
502 with self
.subTest(i
=1):
505 perms_in
= {"tokens": True}
506 perms_out
= {"default": False, "admin": False, "tokens": True}
507 self
.auth
.get_role_list
.return_value
= []
508 self
.auth
.create_role
.return_value
= rid1
509 rid2
, oid
= self
.topic
.new(
512 {"name": self
.test_name
, "permissions": perms_in
},
514 self
.assertEqual(len(rollback
), 1, "Wrong rollback length")
515 self
.assertEqual(rid2
, rid1
, "Wrong project identifier")
516 content
= self
.auth
.create_role
.call_args
[0][0]
517 self
.assertEqual(content
["name"], self
.test_name
, "Wrong role name")
518 self
.assertEqual(content
["permissions"], perms_out
, "Wrong permissions")
519 self
.assertIsNotNone(content
["_admin"]["created"], "Wrong creation time")
521 content
["_admin"]["modified"],
522 content
["_admin"]["created"],
523 "Wrong modification time",
525 with self
.subTest(i
=2):
527 with self
.assertRaises(
528 EngineException
, msg
="Accepted wrong permissions"
533 {"name": "other-role-name", "permissions": {"projects": True}},
535 self
.assertEqual(len(rollback
), 0, "Wrong rollback length")
537 e
.exception
.http_code
,
538 HTTPStatus
.UNPROCESSABLE_ENTITY
,
539 "Wrong HTTP status code",
542 "invalid permission '{}'".format("projects"),
543 norm(str(e
.exception
)),
544 "Wrong exception text",
547 def test_edit_role(self
):
552 "name": self
.test_name
,
553 "permissions": {"tokens": True},
554 "_admin": {"created": now
, "modified": now
},
556 with self
.subTest(i
=1):
557 self
.auth
.get_role_list
.side_effect
= [[role
], []]
558 self
.auth
.get_role
.return_value
= role
559 new_name
= "new-role-name"
560 perms_in
= {"tokens": False, "tokens:get": True}
568 self
.fake_session
, rid
, {"name": new_name
, "permissions": perms_in
}
570 content
= self
.auth
.update_role
.call_args
[0][0]
571 self
.assertEqual(content
["_id"], rid
, "Wrong role identifier")
572 self
.assertEqual(content
["_admin"]["created"], now
, "Wrong creation time")
574 content
["_admin"]["modified"], now
, "Wrong modification time"
576 self
.assertEqual(content
["name"], new_name
, "Wrong role name")
577 self
.assertEqual(content
["permissions"], perms_out
, "Wrong permissions")
578 with self
.subTest(i
=2):
579 new_name
= "other-role-name"
580 perms_in
= {"tokens": False, "tokens:post": True}
581 self
.auth
.get_role_list
.side_effect
= [[role
], []]
582 with self
.assertRaises(
583 EngineException
, msg
="Accepted wrong permissions"
586 self
.fake_session
, rid
, {"name": new_name
, "permissions": perms_in
}
589 e
.exception
.http_code
,
590 HTTPStatus
.UNPROCESSABLE_ENTITY
,
591 "Wrong HTTP status code",
594 "invalid permission '{}'".format("tokens:post"),
595 norm(str(e
.exception
)),
596 "Wrong exception text",
599 def test_delete_role(self
):
600 with self
.subTest(i
=1):
602 role
= {"_id": rid
, "name": "other-role-name"}
603 self
.auth
.get_role_list
.return_value
= [role
]
604 self
.auth
.get_role
.return_value
= role
605 self
.auth
.delete_role
.return_value
= {"deleted": 1}
606 self
.auth
.get_user_list
.return_value
= []
607 rc
= self
.topic
.delete(self
.fake_session
, rid
)
608 self
.assertEqual(rc
, {"deleted": 1}, "Wrong role deletion return info")
610 self
.auth
.get_role_list
.call_args
[0][0]["_id"],
612 "Wrong role identifier",
615 self
.auth
.get_role
.call_args
[0][0], rid
, "Wrong role identifier"
618 self
.auth
.delete_role
.call_args
[0][0], rid
, "Wrong role identifier"
621 def test_conflict_on_new(self
):
622 with self
.subTest(i
=1):
625 with self
.assertRaises(
626 EngineException
, msg
="Accepted uuid as role name"
628 self
.topic
.new(rollback
, self
.fake_session
, {"name": rid
})
629 self
.assertEqual(len(rollback
), 0, "Wrong rollback length")
631 e
.exception
.http_code
,
632 HTTPStatus
.UNPROCESSABLE_ENTITY
,
633 "Wrong HTTP status code",
636 "role name '{}' cannot have an uuid format".format(rid
),
637 norm(str(e
.exception
)),
638 "Wrong exception text",
640 with self
.subTest(i
=2):
642 self
.auth
.get_role_list
.return_value
= [
643 {"_id": str(uuid4()), "name": self
.test_name
}
645 with self
.assertRaises(
646 EngineException
, msg
="Accepted existing role name"
648 self
.topic
.new(rollback
, self
.fake_session
, {"name": self
.test_name
})
649 self
.assertEqual(len(rollback
), 0, "Wrong rollback length")
651 e
.exception
.http_code
, HTTPStatus
.CONFLICT
, "Wrong HTTP status code"
654 "role name '{}' exists".format(self
.test_name
),
655 norm(str(e
.exception
)),
656 "Wrong exception text",
659 def test_conflict_on_edit(self
):
661 with self
.subTest(i
=1):
662 self
.auth
.get_role_list
.return_value
= [
663 {"_id": rid
, "name": self
.test_name
, "permissions": {}}
665 new_name
= str(uuid4())
666 with self
.assertRaises(
667 EngineException
, msg
="Accepted uuid as role name"
669 self
.topic
.edit(self
.fake_session
, rid
, {"name": new_name
})
671 e
.exception
.http_code
,
672 HTTPStatus
.UNPROCESSABLE_ENTITY
,
673 "Wrong HTTP status code",
676 "role name '{}' cannot have an uuid format".format(new_name
),
677 norm(str(e
.exception
)),
678 "Wrong exception text",
680 for i
, role_name
in enumerate(["system_admin", "project_admin"], start
=2):
681 with self
.subTest(i
=i
):
683 self
.auth
.get_role
.return_value
= {
688 with self
.assertRaises(
690 msg
="Accepted renaming of role '{}'".format(role_name
),
692 self
.topic
.edit(self
.fake_session
, rid
, {"name": "new-name"})
694 e
.exception
.http_code
,
695 HTTPStatus
.FORBIDDEN
,
696 "Wrong HTTP status code",
699 "you cannot rename role '{}'".format(role_name
),
700 norm(str(e
.exception
)),
701 "Wrong exception text",
703 with self
.subTest(i
=i
+ 1):
704 new_name
= "new-role-name"
705 self
.auth
.get_role_list
.side_effect
= [
706 [{"_id": rid
, "name": self
.test_name
, "permissions": {}}],
707 [{"_id": str(uuid4()), "name": new_name
, "permissions": {}}],
709 self
.auth
.get_role
.return_value
= {
711 "name": self
.test_name
,
714 with self
.assertRaises(
715 EngineException
, msg
="Accepted existing role name"
717 self
.topic
.edit(self
.fake_session
, rid
, {"name": new_name
})
719 e
.exception
.http_code
, HTTPStatus
.CONFLICT
, "Wrong HTTP status code"
722 "role name '{}' exists".format(new_name
),
723 norm(str(e
.exception
)),
724 "Wrong exception text",
727 def test_conflict_on_del(self
):
728 for i
, role_name
in enumerate(["system_admin", "project_admin"], start
=1):
729 with self
.subTest(i
=i
):
731 role
= {"_id": rid
, "name": role_name
}
732 self
.auth
.get_role_list
.return_value
= [role
]
733 self
.auth
.get_role
.return_value
= role
734 with self
.assertRaises(
736 msg
="Accepted deletion of role '{}'".format(role_name
),
738 self
.topic
.delete(self
.fake_session
, rid
)
740 e
.exception
.http_code
,
741 HTTPStatus
.FORBIDDEN
,
742 "Wrong HTTP status code",
745 "you cannot delete role '{}'".format(role_name
),
746 norm(str(e
.exception
)),
747 "Wrong exception text",
749 with self
.subTest(i
=i
+ 1):
751 name
= "other-role-name"
752 role
= {"_id": rid
, "name": name
}
753 self
.auth
.get_role_list
.return_value
= [role
]
754 self
.auth
.get_role
.return_value
= role
755 self
.auth
.get_user_list
.return_value
= [
758 "username": self
.test_name
,
759 "project_role_mappings": [{"project": str(uuid4()), "role": rid
}],
762 with self
.assertRaises(
763 EngineException
, msg
="Accepted deletion of used role"
765 self
.topic
.delete(self
.fake_session
, rid
)
767 e
.exception
.http_code
, HTTPStatus
.CONFLICT
, "Wrong HTTP status code"
770 "role '{}' ({}) is being used by user '{}'".format(
771 name
, rid
, self
.test_name
773 norm(str(e
.exception
)),
774 "Wrong exception text",
778 class Test_UserTopicAuth(TestCase
):
781 cls
.test_name
= "test-user-topic"
782 cls
.password
= "Test@123"
785 # self.db = Mock(dbbase.DbBase())
787 self
.fs
= Mock(fsbase
.FsBase())
788 self
.msg
= Mock(msgbase
.MsgBase())
789 self
.auth
= Mock(authconn
.Authconn(None, None, None))
790 self
.topic
= UserTopicAuth(self
.db
, self
.fs
, self
.msg
, self
.auth
)
791 self
.fake_session
= {
792 "username": test_name
,
793 "project_id": (test_pid
,),
798 "allow_show_user_project_role": True,
800 self
.topic
.check_quota
= Mock(return_value
=None) # skip quota
802 def test_new_user(self
):
805 self
.auth
.get_user_list
.return_value
= []
806 self
.auth
.get_project
.return_value
= {"_id": pid
, "name": "some_project"}
807 self
.auth
.create_user
.return_value
= {"_id": uid1
, "username": self
.test_name
}
808 with self
.subTest(i
=1):
811 self
.auth
.get_role
.return_value
= {"_id": rid
, "name": "some_role"}
812 prms_in
= [{"project": "some_project", "role": "some_role"}]
813 prms_out
= [{"project": pid
, "role": rid
}]
814 uid2
, oid
= self
.topic
.new(
818 "username": self
.test_name
,
819 "password": self
.password
,
820 "project_role_mappings": prms_in
,
823 self
.assertEqual(len(rollback
), 1, "Wrong rollback length")
824 self
.assertEqual(uid2
, uid1
, "Wrong project identifier")
825 content
= self
.auth
.create_user
.call_args
[0][0]
826 self
.assertEqual(content
["username"], self
.test_name
, "Wrong project name")
827 self
.assertEqual(content
["password"], self
.password
, "Wrong password")
829 content
["project_role_mappings"],
831 "Wrong project-role mappings",
833 self
.assertIsNotNone(content
["_admin"]["created"], "Wrong creation time")
835 content
["_admin"]["modified"],
836 content
["_admin"]["created"],
837 "Wrong modification time",
839 with self
.subTest(i
=2):
841 def_rid
= str(uuid4())
842 def_role
= {"_id": def_rid
, "name": "project_admin"}
843 self
.auth
.get_role
.return_value
= def_role
844 self
.auth
.get_role_list
.return_value
= [def_role
]
845 prms_out
= [{"project": pid
, "role": def_rid
}]
846 uid2
, oid
= self
.topic
.new(
850 "username": self
.test_name
,
851 "password": self
.password
,
852 "projects": ["some_project"],
855 self
.assertEqual(len(rollback
), 1, "Wrong rollback length")
856 self
.assertEqual(uid2
, uid1
, "Wrong project identifier")
857 content
= self
.auth
.create_user
.call_args
[0][0]
858 self
.assertEqual(content
["username"], self
.test_name
, "Wrong project name")
859 self
.assertEqual(content
["password"], self
.password
, "Wrong password")
861 content
["project_role_mappings"],
863 "Wrong project-role mappings",
865 self
.assertIsNotNone(content
["_admin"]["created"], "Wrong creation time")
867 content
["_admin"]["modified"],
868 content
["_admin"]["created"],
869 "Wrong modification time",
871 with self
.subTest(i
=3):
873 with self
.assertRaises(
874 EngineException
, msg
="Accepted wrong project-role mappings"
880 "username": "other-project-name",
881 "password": "Other@pwd1",
882 "project_role_mappings": [{}],
885 self
.assertEqual(len(rollback
), 0, "Wrong rollback length")
887 e
.exception
.http_code
,
888 HTTPStatus
.UNPROCESSABLE_ENTITY
,
889 "Wrong HTTP status code",
892 "format error at '{}' '{}'".format(
893 "project_role_mappings:{}", "'{}' is a required property"
894 ).format(0, "project"),
895 norm(str(e
.exception
)),
896 "Wrong exception text",
898 with self
.subTest(i
=4):
900 with self
.assertRaises(EngineException
, msg
="Accepted wrong projects") as e
:
905 "username": "other-project-name",
906 "password": "Other@pwd1",
910 self
.assertEqual(len(rollback
), 0, "Wrong rollback length")
912 e
.exception
.http_code
,
913 HTTPStatus
.UNPROCESSABLE_ENTITY
,
914 "Wrong HTTP status code",
917 "format error at '{}' '{}'".format(
918 "projects", "{} is too short"
920 norm(str(e
.exception
)),
921 "Wrong exception text",
924 def test_edit_user(self
):
932 "project_name": "project-1",
934 "role_name": "role-1",
939 "username": self
.test_name
,
940 "project_role_mappings": prms
,
941 "_admin": {"created": now
, "modified": now
},
943 with self
.subTest(i
=1):
944 self
.auth
.get_user_list
.side_effect
= [[user
], []]
945 self
.auth
.get_user
.return_value
= user
948 self
.auth
.get_project
.side_effect
= [
949 {"_id": pid2
, "name": "project-2"},
950 {"_id": pid1
, "name": "project-1"},
952 self
.auth
.get_role
.side_effect
= [
953 {"_id": rid2
, "name": "role-2"},
954 {"_id": rid1
, "name": "role-1"},
956 new_name
= "new-user-name"
957 new_pasw
= "New@pwd1"
958 add_prms
= [{"project": pid2
, "role": rid2
}]
959 rem_prms
= [{"project": pid1
, "role": rid1
}]
964 "username": new_name
,
965 "password": new_pasw
,
966 "add_project_role_mappings": add_prms
,
967 "remove_project_role_mappings": rem_prms
,
970 content
= self
.auth
.update_user
.call_args
[0][0]
971 self
.assertEqual(content
["_id"], uid
, "Wrong user identifier")
972 self
.assertEqual(content
["username"], new_name
, "Wrong user name")
973 self
.assertEqual(content
["password"], new_pasw
, "Wrong user password")
975 content
["add_project_role_mappings"],
977 "Wrong project-role mappings to add",
980 content
["remove_project_role_mappings"],
982 "Wrong project-role mappings to remove",
984 with self
.subTest(i
=2):
985 new_name
= "other-user-name"
987 self
.auth
.get_role_list
.side_effect
= [[user
], []]
988 self
.auth
.get_user_list
.side_effect
= [[user
]]
989 with self
.assertRaises(
990 EngineException
, msg
="Accepted wrong project-role mappings"
995 {"username": new_name
, "project_role_mappings": new_prms
},
998 e
.exception
.http_code
,
999 HTTPStatus
.UNPROCESSABLE_ENTITY
,
1000 "Wrong HTTP status code",
1003 "format error at '{}' '{}'".format(
1004 "project_role_mappings:{}", "'{}' is a required property"
1005 ).format(0, "project"),
1006 norm(str(e
.exception
)),
1007 "Wrong exception text",
1009 with self
.subTest(i
=3):
1010 self
.auth
.get_user_list
.side_effect
= [[user
], []]
1011 self
.auth
.get_user
.return_value
= user
1012 old_password
= self
.password
1013 new_pasw
= "New@pwd1"
1018 "old_password": old_password
,
1019 "password": new_pasw
,
1022 content
= self
.auth
.update_user
.call_args
[0][0]
1024 content
["old_password"], old_password
, "Wrong old password"
1026 self
.assertEqual(content
["password"], new_pasw
, "Wrong user password")
1028 def test_delete_user(self
):
1029 with self
.subTest(i
=1):
1031 self
.fake_session
["username"] = self
.test_name
1034 "username": "other-user-name",
1035 "project_role_mappings": [],
1037 self
.auth
.get_user
.return_value
= user
1038 self
.auth
.delete_user
.return_value
= {"deleted": 1}
1039 rc
= self
.topic
.delete(self
.fake_session
, uid
)
1040 self
.assertEqual(rc
, {"deleted": 1}, "Wrong user deletion return info")
1042 self
.auth
.get_user
.call_args
[0][0], uid
, "Wrong user identifier"
1045 self
.auth
.delete_user
.call_args
[0][0], uid
, "Wrong user identifier"
1048 def test_conflict_on_new(self
):
1049 with self
.subTest(i
=1):
1052 with self
.assertRaises(
1053 EngineException
, msg
="Accepted uuid as username"
1060 "password": self
.password
,
1061 "projects": [test_pid
],
1064 self
.assertEqual(len(rollback
), 0, "Wrong rollback length")
1066 e
.exception
.http_code
,
1067 HTTPStatus
.UNPROCESSABLE_ENTITY
,
1068 "Wrong HTTP status code",
1071 "username '{}' cannot have a uuid format".format(uid
),
1072 norm(str(e
.exception
)),
1073 "Wrong exception text",
1075 with self
.subTest(i
=2):
1077 self
.auth
.get_user_list
.return_value
= [
1078 {"_id": str(uuid4()), "username": self
.test_name
}
1080 with self
.assertRaises(
1081 EngineException
, msg
="Accepted existing username"
1087 "username": self
.test_name
,
1088 "password": self
.password
,
1089 "projects": [test_pid
],
1092 self
.assertEqual(len(rollback
), 0, "Wrong rollback length")
1094 e
.exception
.http_code
, HTTPStatus
.CONFLICT
, "Wrong HTTP status code"
1097 "username '{}' is already used".format(self
.test_name
),
1098 norm(str(e
.exception
)),
1099 "Wrong exception text",
1101 with self
.subTest(i
=3):
1103 self
.auth
.get_user_list
.return_value
= []
1104 self
.auth
.get_role_list
.side_effect
= [[], []]
1105 with self
.assertRaises(
1106 AuthconnNotFoundException
, msg
="Accepted user without default role"
1112 "username": self
.test_name
,
1113 "password": self
.password
,
1114 "projects": [str(uuid4())],
1117 self
.assertEqual(len(rollback
), 0, "Wrong rollback length")
1119 e
.exception
.http_code
, HTTPStatus
.NOT_FOUND
, "Wrong HTTP status code"
1122 "can't find default role for user '{}'".format(self
.test_name
),
1123 norm(str(e
.exception
)),
1124 "Wrong exception text",
1127 def test_conflict_on_edit(self
):
1129 with self
.subTest(i
=1):
1130 self
.auth
.get_user_list
.return_value
= [
1131 {"_id": uid
, "username": self
.test_name
}
1133 new_name
= str(uuid4())
1134 with self
.assertRaises(
1135 EngineException
, msg
="Accepted uuid as username"
1137 self
.topic
.edit(self
.fake_session
, uid
, {"username": new_name
})
1139 e
.exception
.http_code
,
1140 HTTPStatus
.UNPROCESSABLE_ENTITY
,
1141 "Wrong HTTP status code",
1144 "username '{}' cannot have an uuid format".format(new_name
),
1145 norm(str(e
.exception
)),
1146 "Wrong exception text",
1148 with self
.subTest(i
=2):
1149 self
.auth
.get_user_list
.return_value
= [
1150 {"_id": uid
, "username": self
.test_name
}
1152 self
.auth
.get_role_list
.side_effect
= [[], []]
1153 with self
.assertRaises(
1154 AuthconnNotFoundException
, msg
="Accepted user without default role"
1156 self
.topic
.edit(self
.fake_session
, uid
, {"projects": [str(uuid4())]})
1158 e
.exception
.http_code
, HTTPStatus
.NOT_FOUND
, "Wrong HTTP status code"
1161 "can't find a default role for user '{}'".format(self
.test_name
),
1162 norm(str(e
.exception
)),
1163 "Wrong exception text",
1165 with self
.subTest(i
=3):
1166 admin_uid
= str(uuid4())
1167 self
.auth
.get_user_list
.return_value
= [
1168 {"_id": admin_uid
, "username": "admin"}
1170 with self
.assertRaises(
1172 msg
="Accepted removing system_admin role from admin user",
1178 "remove_project_role_mappings": [
1179 {"project": "admin", "role": "system_admin"}
1184 e
.exception
.http_code
, HTTPStatus
.FORBIDDEN
, "Wrong HTTP status code"
1187 "you cannot remove system_admin role from admin user",
1188 norm(str(e
.exception
)),
1189 "Wrong exception text",
1191 with self
.subTest(i
=4):
1192 new_name
= "new-user-name"
1193 self
.auth
.get_user_list
.side_effect
= [
1194 [{"_id": uid
, "name": self
.test_name
}],
1195 [{"_id": str(uuid4()), "name": new_name
}],
1197 with self
.assertRaises(
1198 EngineException
, msg
="Accepted existing username"
1200 self
.topic
.edit(self
.fake_session
, uid
, {"username": new_name
})
1202 e
.exception
.http_code
, HTTPStatus
.CONFLICT
, "Wrong HTTP status code"
1205 "username '{}' is already used".format(new_name
),
1206 norm(str(e
.exception
)),
1207 "Wrong exception text",
1210 def test_conflict_on_del(self
):
1211 with self
.subTest(i
=1):
1213 self
.fake_session
["username"] = self
.test_name
1216 "username": self
.test_name
,
1217 "project_role_mappings": [],
1219 self
.auth
.get_user
.return_value
= user
1220 with self
.assertRaises(
1221 EngineException
, msg
="Accepted deletion of own user"
1223 self
.topic
.delete(self
.fake_session
, uid
)
1225 e
.exception
.http_code
, HTTPStatus
.CONFLICT
, "Wrong HTTP status code"
1228 "you cannot delete your own login user",
1229 norm(str(e
.exception
)),
1230 "Wrong exception text",
1233 def test_user_management(self
):
1235 "user_management": True,
1236 "pwd_expire_days": 30,
1237 "max_pwd_attempt": 5,
1238 "account_expire_days": 90,
1240 "deviceVendor": "test",
1241 "deviceProduct": "test",
1243 self
.permissions
= {"admin": True, "default": True}
1248 "name": self
.test_name
,
1249 "permissions": self
.permissions
,
1250 "_admin": {"created": now
, "modified": now
},
1252 self
.db
.create("roles", role
)
1254 "_id": "72cd0cd6-e8e2-482c-9bc2-15b413bb8500",
1255 "username": "admin",
1256 "password": "bf0d9f988ad9b404464cf8c8749b298209b05fd404119bae0c11e247efbbc4cb",
1258 "created": 1663058370.7721832,
1259 "modified": 1663681183.5651639,
1260 "salt": "37587e7e0c2f4dbfb9416f3fb5543e2b",
1261 "last_token_time": 1666876472.2962265,
1262 "user_status": "always-active",
1265 "project_role_mappings": [
1266 {"project": "a595ce4e-09dc-4b24-9d6f-e723830bc66b", "role": rid
}
1269 self
.db
.create("users", admin_user
)
1270 with self
.subTest(i
=1):
1271 self
.user_create
= AuthconnInternal(self
.config
, self
.db
, self
.permissions
)
1272 user_info
= {"username": "user_mgmt_true", "password": "Test@123"}
1273 self
.user_create
.create_user(user_info
)
1274 user
= self
.db
.get_one("users", {"username": user_info
["username"]})
1275 self
.assertEqual(user
["username"], user_info
["username"], "Wrong user name")
1277 user
["_admin"]["user_status"], "active", "User status is unknown"
1279 self
.assertIn("password_expire_time", user
["_admin"], "Key is not there")
1280 self
.assertIn("account_expire_time", user
["_admin"], "Key is not there")
1281 with self
.subTest(i
=2):
1282 self
.user_update
= AuthconnInternal(self
.config
, self
.db
, self
.permissions
)
1284 "username": "user_lock",
1285 "password": "c94ba8cfe81985cf5c84dff16d5bac95814ab17e44a8871755eb4cf3a27b7d3d",
1287 "created": 1667207552.2191198,
1288 "modified": 1667207552.2191815,
1289 "salt": "560a5d51b1d64bb4b9cae0ccff3f1102",
1290 "user_status": "locked",
1291 "password_expire_time": 1667207552.2191815,
1292 "account_expire_time": now
+ 60,
1294 "last_token_time": 1667207552.2191815,
1296 "_id": "73bbbb71-ed38-4b79-9f58-ece19e7e32d6",
1298 self
.db
.create("users", locked_user
)
1300 "_id": "73bbbb71-ed38-4b79-9f58-ece19e7e32d6",
1301 "system_admin_id": "72cd0cd6-e8e2-482c-9bc2-15b413bb8500",
1305 locked_user
["_admin"]["user_status"], "locked", "User status is unknown"
1307 self
.user_update
.update_user(user_info
)
1308 user
= self
.db
.get_one("users", {"username": locked_user
["username"]})
1310 user
["username"], locked_user
["username"], "Wrong user name"
1313 user
["_admin"]["user_status"], "active", "User status is unknown"
1315 self
.assertEqual(user
["_admin"]["retry_count"], 0, "retry_count is unknown")
1316 with self
.subTest(i
=3):
1317 self
.user_update
= AuthconnInternal(self
.config
, self
.db
, self
.permissions
)
1319 "username": "user_expire",
1320 "password": "c94ba8cfe81985cf5c84dff16d5bac95814ab17e44a8871755eb4cf3a27b7d3d",
1322 "created": 1665602087.601298,
1323 "modified": 1665636442.1245084,
1324 "salt": "560a5d51b1d64bb4b9cae0ccff3f1102",
1325 "user_status": "expired",
1326 "password_expire_time": 1668248628.2191815,
1327 "account_expire_time": 1666952628.2191815,
1329 "last_token_time": 1666779828.2171815,
1331 "_id": "3266430f-8222-407f-b08f-3a242504ab94",
1333 self
.db
.create("users", expired_user
)
1335 "_id": "3266430f-8222-407f-b08f-3a242504ab94",
1336 "system_admin_id": "72cd0cd6-e8e2-482c-9bc2-15b413bb8500",
1340 expired_user
["_admin"]["user_status"],
1342 "User status is unknown",
1344 self
.user_update
.update_user(user_info
)
1345 user
= self
.db
.get_one("users", {"username": expired_user
["username"]})
1347 user
["username"], expired_user
["username"], "Wrong user name"
1350 user
["_admin"]["user_status"], "active", "User status is unknown"
1353 user
["_admin"]["account_expire_time"],
1354 expired_user
["_admin"]["account_expire_time"],
1355 "User expire time is not get extended",
1357 with self
.subTest(i
=4):
1358 self
.config
.update({"user_management": False})
1359 self
.user_create
= AuthconnInternal(self
.config
, self
.db
, self
.permissions
)
1360 user_info
= {"username": "user_mgmt_false", "password": "Test@123"}
1361 self
.user_create
.create_user(user_info
)
1362 user
= self
.db
.get_one("users", {"username": user_info
["username"]})
1363 self
.assertEqual(user
["username"], user_info
["username"], "Wrong user name")
1365 user
["_admin"]["user_status"], "active", "User status is unknown"
1367 self
.assertNotIn("password_expire_time", user
["_admin"], "Key is not there")
1368 self
.assertNotIn("account_expire_time", user
["_admin"], "Key is not there")
1371 class Test_CommonVimWimSdn(TestCase
):
1373 def setUpClass(cls
):
1374 cls
.test_name
= "test-cim-topic" # CIM = Common Infrastructure Manager
1377 self
.db
= Mock(dbbase
.DbBase())
1378 self
.fs
= Mock(fsbase
.FsBase())
1379 self
.msg
= Mock(msgbase
.MsgBase())
1380 self
.auth
= Mock(authconn
.Authconn(None, None, None))
1381 self
.topic
= CommonVimWimSdn(self
.db
, self
.fs
, self
.msg
, self
.auth
)
1382 # Use WIM schemas for testing because they are the simplest
1383 self
.topic
._send
_msg
= Mock()
1384 self
.topic
.topic
= "wims"
1385 self
.topic
.schema_new
= validation
.wim_account_new_schema
1386 self
.topic
.schema_edit
= validation
.wim_account_edit_schema
1387 self
.fake_session
= {
1388 "username": test_name
,
1389 "project_id": (test_pid
,),
1394 "allow_show_user_project_role": True,
1396 self
.topic
.check_quota
= Mock(return_value
=None) # skip quota
1398 def test_new_cvws(self
):
1399 test_url
= "http://0.0.0.0:0"
1400 with self
.subTest(i
=1):
1403 self
.db
.get_one
.return_value
= None
1404 self
.db
.create
.side_effect
= lambda self
, content
: content
["_id"]
1405 cid
, oid
= self
.topic
.new(
1408 {"name": self
.test_name
, "wim_url": test_url
, "wim_type": test_type
},
1410 self
.assertEqual(len(rollback
), 1, "Wrong rollback length")
1411 args
= self
.db
.create
.call_args
[0]
1413 self
.assertEqual(args
[0], self
.topic
.topic
, "Wrong topic")
1414 self
.assertEqual(content
["_id"], cid
, "Wrong CIM identifier")
1415 self
.assertEqual(content
["name"], self
.test_name
, "Wrong CIM name")
1416 self
.assertEqual(content
["wim_url"], test_url
, "Wrong URL")
1417 self
.assertEqual(content
["wim_type"], test_type
, "Wrong CIM type")
1418 self
.assertEqual(content
["schema_version"], "1.11", "Wrong schema version")
1419 self
.assertEqual(content
["op_id"], oid
, "Wrong operation identifier")
1420 self
.assertIsNotNone(content
["_admin"]["created"], "Wrong creation time")
1422 content
["_admin"]["modified"],
1423 content
["_admin"]["created"],
1424 "Wrong modification time",
1427 content
["_admin"]["operationalState"],
1429 "Wrong operational state",
1432 content
["_admin"]["projects_read"],
1434 "Wrong read-only projects",
1437 content
["_admin"]["projects_write"],
1439 "Wrong read/write projects",
1442 content
["_admin"]["current_operation"], "Wrong current operation"
1445 len(content
["_admin"]["operations"]), 1, "Wrong number of operations"
1447 operation
= content
["_admin"]["operations"][0]
1449 operation
["lcmOperationType"], "create", "Wrong operation type"
1452 operation
["operationState"], "PROCESSING", "Wrong operation state"
1455 operation
["startTime"],
1456 content
["_admin"]["created"],
1457 "Wrong operation start time",
1460 operation
["statusEnteredTime"],
1461 content
["_admin"]["created"],
1462 "Wrong operation status enter time",
1465 operation
["detailed-status"], "", "Wrong operation detailed status info"
1468 operation
["operationParams"], "Wrong operation parameters"
1470 # This test is disabled. From Feature 8030 we admit all WIM/SDN types
1471 # with self.subTest(i=2):
1473 # test_type = "bad_type"
1474 # with self.assertRaises(EngineException, msg="Accepted wrong CIM type") as e:
1475 # self.topic.new(rollback, self.fake_session,
1476 # {"name": self.test_name, "wim_url": test_url, "wim_type": test_type})
1477 # self.assertEqual(len(rollback), 0, "Wrong rollback length")
1478 # self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
1479 # self.assertIn("format error at '{}' '{}".format("wim_type", "'{}' is not one of {}").format(test_type,""),
1480 # norm(str(e.exception)), "Wrong exception text")
1482 def test_conflict_on_new(self
):
1483 with self
.subTest(i
=1):
1485 test_url
= "http://0.0.0.0:0"
1487 self
.db
.get_one
.return_value
= {"_id": str(uuid4()), "name": self
.test_name
}
1488 with self
.assertRaises(
1489 EngineException
, msg
="Accepted existing CIM name"
1495 "name": self
.test_name
,
1496 "wim_url": test_url
,
1497 "wim_type": test_type
,
1500 self
.assertEqual(len(rollback
), 0, "Wrong rollback length")
1502 e
.exception
.http_code
, HTTPStatus
.CONFLICT
, "Wrong HTTP status code"
1505 "name '{}' already exists for {}".format(
1506 self
.test_name
, self
.topic
.topic
1508 norm(str(e
.exception
)),
1509 "Wrong exception text",
1512 def test_edit_cvws(self
):
1515 test_url
= "http://0.0.0.0:0"
1519 "name": self
.test_name
,
1520 "wim_url": test_url
,
1521 "wim_type": test_type
,
1525 "operations": [{"lcmOperationType": "create"}],
1528 with self
.subTest(i
=1):
1529 new_name
= "new-cim-name"
1530 new_url
= "https://1.1.1.1:1"
1532 self
.db
.get_one
.side_effect
= [cvws
, None]
1533 self
.db
.replace
.return_value
= {"updated": 1}
1534 # self.db.encrypt.side_effect = [b64str(), b64str()]
1538 {"name": new_name
, "wim_url": new_url
, "wim_type": new_type
},
1540 args
= self
.db
.replace
.call_args
[0]
1542 self
.assertEqual(args
[0], self
.topic
.topic
, "Wrong topic")
1543 self
.assertEqual(args
[1], cid
, "Wrong CIM identifier")
1544 self
.assertEqual(content
["_id"], cid
, "Wrong CIM identifier")
1545 self
.assertEqual(content
["name"], new_name
, "Wrong CIM name")
1546 self
.assertEqual(content
["wim_type"], new_type
, "Wrong CIM type")
1547 self
.assertEqual(content
["wim_url"], new_url
, "Wrong URL")
1548 self
.assertEqual(content
["_admin"]["created"], now
, "Wrong creation time")
1550 content
["_admin"]["modified"],
1551 content
["_admin"]["created"],
1552 "Wrong modification time",
1555 len(content
["_admin"]["operations"]), 2, "Wrong number of operations"
1557 operation
= content
["_admin"]["operations"][1]
1559 operation
["lcmOperationType"], "edit", "Wrong operation type"
1562 operation
["operationState"], "PROCESSING", "Wrong operation state"
1565 operation
["startTime"],
1566 content
["_admin"]["modified"],
1567 "Wrong operation start time",
1570 operation
["statusEnteredTime"],
1571 content
["_admin"]["modified"],
1572 "Wrong operation status enter time",
1575 operation
["detailed-status"], "", "Wrong operation detailed status info"
1578 operation
["operationParams"], "Wrong operation parameters"
1580 with self
.subTest(i
=2):
1581 self
.db
.get_one
.side_effect
= [cvws
]
1582 with self
.assertRaises(EngineException
, msg
="Accepted wrong property") as e
:
1586 {"name": "new-name", "extra_prop": "anything"},
1589 e
.exception
.http_code
,
1590 HTTPStatus
.UNPROCESSABLE_ENTITY
,
1591 "Wrong HTTP status code",
1594 "format error '{}'".format(
1595 "additional properties are not allowed ('{}' was unexpected)"
1596 ).format("extra_prop"),
1597 norm(str(e
.exception
)),
1598 "Wrong exception text",
1601 def test_conflict_on_edit(self
):
1602 with self
.subTest(i
=1):
1604 new_name
= "new-cim-name"
1605 self
.db
.get_one
.side_effect
= [
1606 {"_id": cid
, "name": self
.test_name
},
1607 {"_id": str(uuid4()), "name": new_name
},
1609 with self
.assertRaises(
1610 EngineException
, msg
="Accepted existing CIM name"
1612 self
.topic
.edit(self
.fake_session
, cid
, {"name": new_name
})
1614 e
.exception
.http_code
, HTTPStatus
.CONFLICT
, "Wrong HTTP status code"
1617 "name '{}' already exists for {}".format(new_name
, self
.topic
.topic
),
1618 norm(str(e
.exception
)),
1619 "Wrong exception text",
1622 def test_delete_cvws(self
):
1624 ro_pid
= str(uuid4())
1625 rw_pid
= str(uuid4())
1626 cvws
= {"_id": cid
, "name": self
.test_name
}
1627 self
.db
.get_list
.return_value
= []
1628 with self
.subTest(i
=1):
1630 "projects_read": [test_pid
, ro_pid
, rw_pid
],
1631 "projects_write": [test_pid
, rw_pid
],
1633 self
.db
.get_one
.return_value
= cvws
1634 oid
= self
.topic
.delete(self
.fake_session
, cid
)
1635 self
.assertIsNone(oid
, "Wrong operation identifier")
1637 self
.db
.get_one
.call_args
[0][0], self
.topic
.topic
, "Wrong topic"
1640 self
.db
.get_one
.call_args
[0][1]["_id"], cid
, "Wrong CIM identifier"
1643 self
.db
.set_one
.call_args
[0][0], self
.topic
.topic
, "Wrong topic"
1646 self
.db
.set_one
.call_args
[0][1]["_id"], cid
, "Wrong CIM identifier"
1649 self
.db
.set_one
.call_args
[1]["update_dict"],
1651 "Wrong read-only projects update",
1654 self
.db
.set_one
.call_args
[1]["pull_list"],
1656 "_admin.projects_read": (test_pid
,),
1657 "_admin.projects_write": (test_pid
,),
1659 "Wrong read/write projects update",
1661 self
.topic
._send
_msg
.assert_not_called()
1662 with self
.subTest(i
=2):
1665 "projects_read": [test_pid
],
1666 "projects_write": [test_pid
],
1669 self
.db
.get_one
.return_value
= cvws
1670 oid
= self
.topic
.delete(self
.fake_session
, cid
)
1671 self
.assertEqual(oid
, cid
+ ":0", "Wrong operation identifier")
1673 self
.db
.get_one
.call_args
[0][0], self
.topic
.topic
, "Wrong topic"
1676 self
.db
.get_one
.call_args
[0][1]["_id"], cid
, "Wrong CIM identifier"
1679 self
.db
.set_one
.call_args
[0][0], self
.topic
.topic
, "Wrong topic"
1682 self
.db
.set_one
.call_args
[0][1]["_id"], cid
, "Wrong user identifier"
1685 self
.db
.set_one
.call_args
[1]["update_dict"],
1686 {"_admin.to_delete": True},
1687 "Wrong _admin.to_delete update",
1689 operation
= self
.db
.set_one
.call_args
[1]["push"]["_admin.operations"]
1691 operation
["lcmOperationType"], "delete", "Wrong operation type"
1694 operation
["operationState"], "PROCESSING", "Wrong operation state"
1697 operation
["detailed-status"], "", "Wrong operation detailed status"
1700 operation
["operationParams"], "Wrong operation parameters"
1703 operation
["startTime"], now
, "Wrong operation start time"
1706 operation
["statusEnteredTime"], now
, "Wrong operation status enter time"
1708 self
.topic
._send
_msg
.assert_called_once_with(
1709 "delete", {"_id": cid
, "op_id": cid
+ ":0"}, not_send_msg
=None
1711 with self
.subTest(i
=3):
1713 "projects_read": [],
1714 "projects_write": [],
1717 self
.db
.get_one
.return_value
= cvws
1718 self
.topic
._send
_msg
.reset_mock()
1719 self
.db
.get_one
.reset_mock()
1720 self
.db
.del_one
.reset_mock()
1721 self
.fake_session
["force"] = True # to force deletion
1722 self
.fake_session
["admin"] = True # to force deletion
1723 self
.fake_session
["project_id"] = [] # to force deletion
1724 oid
= self
.topic
.delete(self
.fake_session
, cid
)
1725 self
.assertIsNone(oid
, "Wrong operation identifier")
1727 self
.db
.get_one
.call_args
[0][0], self
.topic
.topic
, "Wrong topic"
1730 self
.db
.get_one
.call_args
[0][1]["_id"], cid
, "Wrong CIM identifier"
1733 self
.db
.del_one
.call_args
[0][0], self
.topic
.topic
, "Wrong topic"
1736 self
.db
.del_one
.call_args
[0][1]["_id"], cid
, "Wrong CIM identifier"
1738 self
.topic
._send
_msg
.assert_called_once_with(
1739 "deleted", {"_id": cid
, "op_id": None}, not_send_msg
=None
1743 if __name__
== "__main__":