Feature 10239: Distributed VCA
[osm/NBI.git] / osm_nbi / tests / test_admin_topics.py
1 #! /usr/bin/python3
2 # -*- coding: utf-8 -*-
3
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16
17 __author__ = "Pedro de la Cruz Ramos, pedro.delacruzramos@altran.com"
18 __date__ = "$2019-10-019"
19
20 import unittest
21 from unittest import TestCase
22 from unittest.mock import Mock, patch, call
23 from uuid import uuid4
24 from http import HTTPStatus
25 from time import time
26 from random import randint
27 from osm_common import dbbase, fsbase, msgbase
28 from osm_nbi import authconn, validation
29 from osm_nbi.admin_topics import (
30 ProjectTopicAuth,
31 RoleTopicAuth,
32 UserTopicAuth,
33 CommonVimWimSdn,
34 VcaTopic,
35 )
36 from osm_nbi.engine import EngineException
37 from osm_nbi.authconn import AuthconnNotFoundException
38
39
40 test_pid = str(uuid4())
41 test_name = "test-user"
42
43
44 def norm(str):
45 """Normalize string for checking"""
46 return ' '.join(str.strip().split()).lower()
47
48
49 class TestVcaTopic(TestCase):
50 def setUp(self):
51 self.db = Mock(dbbase.DbBase())
52 self.fs = Mock(fsbase.FsBase())
53 self.msg = Mock(msgbase.MsgBase())
54 self.auth = Mock(authconn.Authconn(None, None, None))
55 self.vca_topic = VcaTopic(self.db, self.fs, self.msg, self.auth)
56
57 @patch("osm_nbi.admin_topics.CommonVimWimSdn.format_on_new")
58 def test_format_on_new(self, mock_super_format_on_new):
59 content = {
60 "_id": "id",
61 "secret": "encrypted_secret",
62 "cacert": "encrypted_cacert",
63 }
64 self.db.encrypt.side_effect = ["secret", "cacert"]
65 mock_super_format_on_new.return_value = "1234"
66
67 oid = self.vca_topic.format_on_new(content)
68
69 self.assertEqual(oid, "1234")
70 self.assertEqual(content["secret"], "secret")
71 self.assertEqual(content["cacert"], "cacert")
72 self.db.encrypt.assert_has_calls(
73 [
74 call("encrypted_secret", schema_version="1.11", salt="id"),
75 call("encrypted_cacert", schema_version="1.11", salt="id"),
76 ]
77 )
78 mock_super_format_on_new.assert_called_with(content, None, False)
79
80 @patch("osm_nbi.admin_topics.CommonVimWimSdn.format_on_edit")
81 def test_format_on_edit(self, mock_super_format_on_edit):
82 edit_content = {
83 "_id": "id",
84 "secret": "encrypted_secret",
85 "cacert": "encrypted_cacert",
86 }
87 final_content = {
88 "_id": "id",
89 "schema_version": "1.11",
90 }
91 self.db.encrypt.side_effect = ["secret", "cacert"]
92 mock_super_format_on_edit.return_value = "1234"
93
94 oid = self.vca_topic.format_on_edit(final_content, edit_content)
95
96 self.assertEqual(oid, "1234")
97 self.assertEqual(final_content["secret"], "secret")
98 self.assertEqual(final_content["cacert"], "cacert")
99 self.db.encrypt.assert_has_calls(
100 [
101 call("encrypted_secret", schema_version="1.11", salt="id"),
102 call("encrypted_cacert", schema_version="1.11", salt="id"),
103 ]
104 )
105 mock_super_format_on_edit.assert_called()
106
107 @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
108 def test_check_conflict_on_del(self, mock_check_conflict_on_del):
109 session = {
110 "project_id": "project-id",
111 "force": False,
112 }
113 _id = "vca-id"
114 db_content = {}
115
116 self.db.get_list.return_value = None
117
118 self.vca_topic.check_conflict_on_del(session, _id, db_content)
119
120 self.db.get_list.assert_called_with(
121 "vim_accounts",
122 {"vca": _id, '_admin.projects_read.cont': 'project-id'},
123 )
124 mock_check_conflict_on_del.assert_called_with(session, _id, db_content)
125
126 @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
127 def test_check_conflict_on_del_force(self, mock_check_conflict_on_del):
128 session = {
129 "project_id": "project-id",
130 "force": True,
131 }
132 _id = "vca-id"
133 db_content = {}
134
135 self.vca_topic.check_conflict_on_del(session, _id, db_content)
136
137 self.db.get_list.assert_not_called()
138 mock_check_conflict_on_del.assert_not_called()
139
140 @patch("osm_nbi.admin_topics.CommonVimWimSdn.check_conflict_on_del")
141 def test_check_conflict_on_del_with_conflict(self, mock_check_conflict_on_del):
142 session = {
143 "project_id": "project-id",
144 "force": False,
145 }
146 _id = "vca-id"
147 db_content = {}
148
149 self.db.get_list.return_value = {"_id": "vim", "vca": "vca-id"}
150
151 with self.assertRaises(EngineException) as context:
152 self.vca_topic.check_conflict_on_del(session, _id, db_content)
153 self.assertEqual(
154 context.exception,
155 EngineException(
156 "There is at least one VIM account using this vca",
157 http_code=HTTPStatus.CONFLICT
158 )
159 )
160
161 self.db.get_list.assert_called_with(
162 "vim_accounts",
163 {"vca": _id, '_admin.projects_read.cont': 'project-id'},
164 )
165 mock_check_conflict_on_del.assert_not_called()
166
167
168 class Test_ProjectTopicAuth(TestCase):
169
170 @classmethod
171 def setUpClass(cls):
172 cls.test_name = "test-project-topic"
173
174 def setUp(self):
175 self.db = Mock(dbbase.DbBase())
176 self.fs = Mock(fsbase.FsBase())
177 self.msg = Mock(msgbase.MsgBase())
178 self.auth = Mock(authconn.Authconn(None, None, None))
179 self.topic = ProjectTopicAuth(self.db, self.fs, self.msg, self.auth)
180 self.fake_session = {"username": self.test_name, "project_id": (test_pid,), "method": None,
181 "admin": True, "force": False, "public": False, "allow_show_user_project_role": True}
182 self.topic.check_quota = Mock(return_value=None) # skip quota
183
184 def test_new_project(self):
185 with self.subTest(i=1):
186 rollback = []
187 pid1 = str(uuid4())
188 self.auth.get_project_list.return_value = []
189 self.auth.create_project.return_value = pid1
190 pid2, oid = self.topic.new(rollback, self.fake_session, {"name": self.test_name, "quotas": {}})
191 self.assertEqual(len(rollback), 1, "Wrong rollback length")
192 self.assertEqual(pid2, pid1, "Wrong project identifier")
193 content = self.auth.create_project.call_args[0][0]
194 self.assertEqual(content["name"], self.test_name, "Wrong project name")
195 self.assertEqual(content["quotas"], {}, "Wrong quotas")
196 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
197 self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time")
198 with self.subTest(i=2):
199 rollback = []
200 with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e:
201 self.topic.new(rollback, self.fake_session, {"name": "other-project-name", "quotas": {"baditems": 10}})
202 self.assertEqual(len(rollback), 0, "Wrong rollback length")
203 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
204 self.assertIn("format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'"
205 .format("baditems"), norm(str(e.exception)), "Wrong exception text")
206
207 def test_edit_project(self):
208 now = time()
209 pid = str(uuid4())
210 proj = {"_id": pid, "name": self.test_name, "_admin": {"created": now, "modified": now}}
211 with self.subTest(i=1):
212 self.auth.get_project_list.side_effect = [[proj], []]
213 new_name = "new-project-name"
214 quotas = {"vnfds": randint(0, 100), "nsds": randint(0, 100)}
215 self.topic.edit(self.fake_session, pid, {"name": new_name, "quotas": quotas})
216 _id, content = self.auth.update_project.call_args[0]
217 self.assertEqual(_id, pid, "Wrong project identifier")
218 self.assertEqual(content["_id"], pid, "Wrong project identifier")
219 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
220 self.assertGreater(content["_admin"]["modified"], now, "Wrong modification time")
221 self.assertEqual(content["name"], new_name, "Wrong project name")
222 self.assertEqual(content["quotas"], quotas, "Wrong quotas")
223 with self.subTest(i=2):
224 new_name = "other-project-name"
225 quotas = {"baditems": randint(0, 100)}
226 self.auth.get_project_list.side_effect = [[proj], []]
227 with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e:
228 self.topic.edit(self.fake_session, pid, {"name": new_name, "quotas": quotas})
229 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
230 self.assertIn("format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'"
231 .format("baditems"), norm(str(e.exception)), "Wrong exception text")
232
233 def test_conflict_on_new(self):
234 with self.subTest(i=1):
235 rollback = []
236 pid = str(uuid4())
237 with self.assertRaises(EngineException, msg="Accepted uuid as project name") as e:
238 self.topic.new(rollback, self.fake_session, {"name": pid})
239 self.assertEqual(len(rollback), 0, "Wrong rollback length")
240 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
241 self.assertIn("project name '{}' cannot have an uuid format".format(pid),
242 norm(str(e.exception)), "Wrong exception text")
243 with self.subTest(i=2):
244 rollback = []
245 self.auth.get_project_list.return_value = [{"_id": test_pid, "name": self.test_name}]
246 with self.assertRaises(EngineException, msg="Accepted existing project name") as e:
247 self.topic.new(rollback, self.fake_session, {"name": self.test_name})
248 self.assertEqual(len(rollback), 0, "Wrong rollback length")
249 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
250 self.assertIn("project '{}' exists".format(self.test_name),
251 norm(str(e.exception)), "Wrong exception text")
252
253 def test_conflict_on_edit(self):
254 with self.subTest(i=1):
255 self.auth.get_project_list.return_value = [{"_id": test_pid, "name": self.test_name}]
256 new_name = str(uuid4())
257 with self.assertRaises(EngineException, msg="Accepted uuid as project name") as e:
258 self.topic.edit(self.fake_session, test_pid, {"name": new_name})
259 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
260 self.assertIn("project name '{}' cannot have an uuid format".format(new_name),
261 norm(str(e.exception)), "Wrong exception text")
262 with self.subTest(i=2):
263 pid = str(uuid4())
264 self.auth.get_project_list.return_value = [{"_id": pid, "name": "admin"}]
265 with self.assertRaises(EngineException, msg="Accepted renaming of project 'admin'") as e:
266 self.topic.edit(self.fake_session, pid, {"name": "new-name"})
267 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
268 self.assertIn("you cannot rename project 'admin'",
269 norm(str(e.exception)), "Wrong exception text")
270 with self.subTest(i=3):
271 new_name = "new-project-name"
272 self.auth.get_project_list.side_effect = [[{"_id": test_pid, "name": self.test_name}],
273 [{"_id": str(uuid4()), "name": new_name}]]
274 with self.assertRaises(EngineException, msg="Accepted existing project name") as e:
275 self.topic.edit(self.fake_session, pid, {"name": new_name})
276 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
277 self.assertIn("project '{}' is already used".format(new_name),
278 norm(str(e.exception)), "Wrong exception text")
279
280 def test_delete_project(self):
281 with self.subTest(i=1):
282 pid = str(uuid4())
283 self.auth.get_project.return_value = {"_id": pid, "name": "other-project-name"}
284 self.auth.delete_project.return_value = {"deleted": 1}
285 self.auth.get_user_list.return_value = []
286 self.db.get_list.return_value = []
287 rc = self.topic.delete(self.fake_session, pid)
288 self.assertEqual(rc, {"deleted": 1}, "Wrong project deletion return info")
289 self.assertEqual(self.auth.get_project.call_args[0][0], pid, "Wrong project identifier")
290 self.assertEqual(self.auth.delete_project.call_args[0][0], pid, "Wrong project identifier")
291
292 def test_conflict_on_del(self):
293 with self.subTest(i=1):
294 self.auth.get_project.return_value = {"_id": test_pid, "name": self.test_name}
295 with self.assertRaises(EngineException, msg="Accepted deletion of own project") as e:
296 self.topic.delete(self.fake_session, self.test_name)
297 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
298 self.assertIn("you cannot delete your own project", norm(str(e.exception)), "Wrong exception text")
299 with self.subTest(i=2):
300 self.auth.get_project.return_value = {"_id": str(uuid4()), "name": "admin"}
301 with self.assertRaises(EngineException, msg="Accepted deletion of project 'admin'") as e:
302 self.topic.delete(self.fake_session, "admin")
303 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
304 self.assertIn("you cannot delete project 'admin'", norm(str(e.exception)), "Wrong exception text")
305 with self.subTest(i=3):
306 pid = str(uuid4())
307 name = "other-project-name"
308 self.auth.get_project.return_value = {"_id": pid, "name": name}
309 self.auth.get_user_list.return_value = [{"_id": str(uuid4()), "username": self.test_name,
310 "project_role_mappings": [{"project": pid, "role": str(uuid4())}]}]
311 with self.assertRaises(EngineException, msg="Accepted deletion of used project") as e:
312 self.topic.delete(self.fake_session, pid)
313 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
314 self.assertIn("project '{}' ({}) is being used by user '{}'".format(name, pid, self.test_name),
315 norm(str(e.exception)), "Wrong exception text")
316 with self.subTest(i=4):
317 self.auth.get_user_list.return_value = []
318 self.db.get_list.return_value = [{"_id": str(uuid4()), "id": self.test_name,
319 "_admin": {"projects_read": [pid], "projects_write": []}}]
320 with self.assertRaises(EngineException, msg="Accepted deletion of used project") as e:
321 self.topic.delete(self.fake_session, pid)
322 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
323 self.assertIn("project '{}' ({}) is being used by {} '{}'"
324 .format(name, pid, "vnf descriptor", self.test_name),
325 norm(str(e.exception)), "Wrong exception text")
326
327
328 class Test_RoleTopicAuth(TestCase):
329
330 @classmethod
331 def setUpClass(cls):
332 cls.test_name = "test-role-topic"
333 cls.test_operations = ["tokens:get"]
334
335 def setUp(self):
336 self.db = Mock(dbbase.DbBase())
337 self.fs = Mock(fsbase.FsBase())
338 self.msg = Mock(msgbase.MsgBase())
339 self.auth = Mock(authconn.Authconn(None, None, None))
340 self.auth.role_permissions = self.test_operations
341 self.topic = RoleTopicAuth(self.db, self.fs, self.msg, self.auth)
342 self.fake_session = {"username": test_name, "project_id": (test_pid,), "method": None,
343 "admin": True, "force": False, "public": False, "allow_show_user_project_role": True}
344 self.topic.check_quota = Mock(return_value=None) # skip quota
345
346 def test_new_role(self):
347 with self.subTest(i=1):
348 rollback = []
349 rid1 = str(uuid4())
350 perms_in = {"tokens": True}
351 perms_out = {"default": False, "admin": False, "tokens": True}
352 self.auth.get_role_list.return_value = []
353 self.auth.create_role.return_value = rid1
354 rid2, oid = self.topic.new(rollback, self.fake_session, {"name": self.test_name, "permissions": perms_in})
355 self.assertEqual(len(rollback), 1, "Wrong rollback length")
356 self.assertEqual(rid2, rid1, "Wrong project identifier")
357 content = self.auth.create_role.call_args[0][0]
358 self.assertEqual(content["name"], self.test_name, "Wrong role name")
359 self.assertEqual(content["permissions"], perms_out, "Wrong permissions")
360 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
361 self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time")
362 with self.subTest(i=2):
363 rollback = []
364 with self.assertRaises(EngineException, msg="Accepted wrong permissions") as e:
365 self.topic.new(rollback, self.fake_session,
366 {"name": "other-role-name", "permissions": {"projects": True}})
367 self.assertEqual(len(rollback), 0, "Wrong rollback length")
368 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
369 self.assertIn("invalid permission '{}'".format("projects"),
370 norm(str(e.exception)), "Wrong exception text")
371
372 def test_edit_role(self):
373 now = time()
374 rid = str(uuid4())
375 role = {"_id": rid, "name": self.test_name, "permissions": {"tokens": True},
376 "_admin": {"created": now, "modified": now}}
377 with self.subTest(i=1):
378 self.auth.get_role_list.side_effect = [[role], []]
379 self.auth.get_role.return_value = role
380 new_name = "new-role-name"
381 perms_in = {"tokens": False, "tokens:get": True}
382 perms_out = {"default": False, "admin": False, "tokens": False, "tokens:get": True}
383 self.topic.edit(self.fake_session, rid, {"name": new_name, "permissions": perms_in})
384 content = self.auth.update_role.call_args[0][0]
385 self.assertEqual(content["_id"], rid, "Wrong role identifier")
386 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
387 self.assertGreater(content["_admin"]["modified"], now, "Wrong modification time")
388 self.assertEqual(content["name"], new_name, "Wrong role name")
389 self.assertEqual(content["permissions"], perms_out, "Wrong permissions")
390 with self.subTest(i=2):
391 new_name = "other-role-name"
392 perms_in = {"tokens": False, "tokens:post": True}
393 self.auth.get_role_list.side_effect = [[role], []]
394 with self.assertRaises(EngineException, msg="Accepted wrong permissions") as e:
395 self.topic.edit(self.fake_session, rid, {"name": new_name, "permissions": perms_in})
396 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
397 self.assertIn("invalid permission '{}'".format("tokens:post"),
398 norm(str(e.exception)), "Wrong exception text")
399
400 def test_delete_role(self):
401 with self.subTest(i=1):
402 rid = str(uuid4())
403 role = {"_id": rid, "name": "other-role-name"}
404 self.auth.get_role_list.return_value = [role]
405 self.auth.get_role.return_value = role
406 self.auth.delete_role.return_value = {"deleted": 1}
407 self.auth.get_user_list.return_value = []
408 rc = self.topic.delete(self.fake_session, rid)
409 self.assertEqual(rc, {"deleted": 1}, "Wrong role deletion return info")
410 self.assertEqual(self.auth.get_role_list.call_args[0][0]["_id"], rid, "Wrong role identifier")
411 self.assertEqual(self.auth.get_role.call_args[0][0], rid, "Wrong role identifier")
412 self.assertEqual(self.auth.delete_role.call_args[0][0], rid, "Wrong role identifier")
413
414 def test_conflict_on_new(self):
415 with self.subTest(i=1):
416 rollback = []
417 rid = str(uuid4())
418 with self.assertRaises(EngineException, msg="Accepted uuid as role name") as e:
419 self.topic.new(rollback, self.fake_session, {"name": rid})
420 self.assertEqual(len(rollback), 0, "Wrong rollback length")
421 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
422 self.assertIn("role name '{}' cannot have an uuid format".format(rid),
423 norm(str(e.exception)), "Wrong exception text")
424 with self.subTest(i=2):
425 rollback = []
426 self.auth.get_role_list.return_value = [{"_id": str(uuid4()), "name": self.test_name}]
427 with self.assertRaises(EngineException, msg="Accepted existing role name") as e:
428 self.topic.new(rollback, self.fake_session, {"name": self.test_name})
429 self.assertEqual(len(rollback), 0, "Wrong rollback length")
430 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
431 self.assertIn("role name '{}' exists".format(self.test_name),
432 norm(str(e.exception)), "Wrong exception text")
433
434 def test_conflict_on_edit(self):
435 rid = str(uuid4())
436 with self.subTest(i=1):
437 self.auth.get_role_list.return_value = [{"_id": rid, "name": self.test_name, "permissions": {}}]
438 new_name = str(uuid4())
439 with self.assertRaises(EngineException, msg="Accepted uuid as role name") as e:
440 self.topic.edit(self.fake_session, rid, {"name": new_name})
441 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
442 self.assertIn("role name '{}' cannot have an uuid format".format(new_name),
443 norm(str(e.exception)), "Wrong exception text")
444 for i, role_name in enumerate(["system_admin", "project_admin"], start=2):
445 with self.subTest(i=i):
446 rid = str(uuid4())
447 self.auth.get_role.return_value = {"_id": rid, "name": role_name, "permissions": {}}
448 with self.assertRaises(EngineException, msg="Accepted renaming of role '{}'".format(role_name)) as e:
449 self.topic.edit(self.fake_session, rid, {"name": "new-name"})
450 self.assertEqual(e.exception.http_code, HTTPStatus.FORBIDDEN, "Wrong HTTP status code")
451 self.assertIn("you cannot rename role '{}'".format(role_name),
452 norm(str(e.exception)), "Wrong exception text")
453 with self.subTest(i=i+1):
454 new_name = "new-role-name"
455 self.auth.get_role_list.side_effect = [[{"_id": rid, "name": self.test_name, "permissions": {}}],
456 [{"_id": str(uuid4()), "name": new_name, "permissions": {}}]]
457 self.auth.get_role.return_value = {"_id": rid, "name": self.test_name, "permissions": {}}
458 with self.assertRaises(EngineException, msg="Accepted existing role name") as e:
459 self.topic.edit(self.fake_session, rid, {"name": new_name})
460 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
461 self.assertIn("role name '{}' exists".format(new_name),
462 norm(str(e.exception)), "Wrong exception text")
463
464 def test_conflict_on_del(self):
465 for i, role_name in enumerate(["system_admin", "project_admin"], start=1):
466 with self.subTest(i=i):
467 rid = str(uuid4())
468 role = {"_id": rid, "name": role_name}
469 self.auth.get_role_list.return_value = [role]
470 self.auth.get_role.return_value = role
471 with self.assertRaises(EngineException, msg="Accepted deletion of role '{}'".format(role_name)) as e:
472 self.topic.delete(self.fake_session, rid)
473 self.assertEqual(e.exception.http_code, HTTPStatus.FORBIDDEN, "Wrong HTTP status code")
474 self.assertIn("you cannot delete role '{}'".format(role_name),
475 norm(str(e.exception)), "Wrong exception text")
476 with self.subTest(i=i+1):
477 rid = str(uuid4())
478 name = "other-role-name"
479 role = {"_id": rid, "name": name}
480 self.auth.get_role_list.return_value = [role]
481 self.auth.get_role.return_value = role
482 self.auth.get_user_list.return_value = [{"_id": str(uuid4()), "username": self.test_name,
483 "project_role_mappings": [{"project": str(uuid4()), "role": rid}]}]
484 with self.assertRaises(EngineException, msg="Accepted deletion of used role") as e:
485 self.topic.delete(self.fake_session, rid)
486 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
487 self.assertIn("role '{}' ({}) is being used by user '{}'".format(name, rid, self.test_name),
488 norm(str(e.exception)), "Wrong exception text")
489
490
491 class Test_UserTopicAuth(TestCase):
492
493 @classmethod
494 def setUpClass(cls):
495 cls.test_name = "test-user-topic"
496
497 def setUp(self):
498 self.db = Mock(dbbase.DbBase())
499 self.fs = Mock(fsbase.FsBase())
500 self.msg = Mock(msgbase.MsgBase())
501 self.auth = Mock(authconn.Authconn(None, None, None))
502 self.topic = UserTopicAuth(self.db, self.fs, self.msg, self.auth)
503 self.fake_session = {"username": test_name, "project_id": (test_pid,), "method": None,
504 "admin": True, "force": False, "public": False, "allow_show_user_project_role": True}
505 self.topic.check_quota = Mock(return_value=None) # skip quota
506
507 def test_new_user(self):
508 uid1 = str(uuid4())
509 pid = str(uuid4())
510 self.auth.get_user_list.return_value = []
511 self.auth.get_project.return_value = {"_id": pid, "name": "some_project"}
512 self.auth.create_user.return_value = {"_id": uid1, "username": self.test_name}
513 with self.subTest(i=1):
514 rollback = []
515 rid = str(uuid4())
516 self.auth.get_role.return_value = {"_id": rid, "name": "some_role"}
517 prms_in = [{"project": "some_project", "role": "some_role"}]
518 prms_out = [{"project": pid, "role": rid}]
519 uid2, oid = self.topic.new(rollback, self.fake_session, {"username": self.test_name,
520 "password": self.test_name,
521 "project_role_mappings": prms_in
522 })
523 self.assertEqual(len(rollback), 1, "Wrong rollback length")
524 self.assertEqual(uid2, uid1, "Wrong project identifier")
525 content = self.auth.create_user.call_args[0][0]
526 self.assertEqual(content["username"], self.test_name, "Wrong project name")
527 self.assertEqual(content["password"], self.test_name, "Wrong password")
528 self.assertEqual(content["project_role_mappings"], prms_out, "Wrong project-role mappings")
529 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
530 self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time")
531 with self.subTest(i=2):
532 rollback = []
533 def_rid = str(uuid4())
534 def_role = {"_id": def_rid, "name": "project_admin"}
535 self.auth.get_role.return_value = def_role
536 self.auth.get_role_list.return_value = [def_role]
537 prms_out = [{"project": pid, "role": def_rid}]
538 uid2, oid = self.topic.new(rollback, self.fake_session, {"username": self.test_name,
539 "password": self.test_name,
540 "projects": ["some_project"]
541 })
542 self.assertEqual(len(rollback), 1, "Wrong rollback length")
543 self.assertEqual(uid2, uid1, "Wrong project identifier")
544 content = self.auth.create_user.call_args[0][0]
545 self.assertEqual(content["username"], self.test_name, "Wrong project name")
546 self.assertEqual(content["password"], self.test_name, "Wrong password")
547 self.assertEqual(content["project_role_mappings"], prms_out, "Wrong project-role mappings")
548 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
549 self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time")
550 with self.subTest(i=3):
551 rollback = []
552 with self.assertRaises(EngineException, msg="Accepted wrong project-role mappings") as e:
553 self.topic.new(rollback, self.fake_session, {"username": "other-project-name",
554 "password": "other-password",
555 "project_role_mappings": [{}]
556 })
557 self.assertEqual(len(rollback), 0, "Wrong rollback length")
558 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
559 self.assertIn("format error at '{}' '{}'"
560 .format("project_role_mappings:{}", "'{}' is a required property").format(0, "project"),
561 norm(str(e.exception)), "Wrong exception text")
562 with self.subTest(i=4):
563 rollback = []
564 with self.assertRaises(EngineException, msg="Accepted wrong projects") as e:
565 self.topic.new(rollback, self.fake_session, {"username": "other-project-name",
566 "password": "other-password",
567 "projects": []
568 })
569 self.assertEqual(len(rollback), 0, "Wrong rollback length")
570 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
571 self.assertIn("format error at '{}' '{}'" .format("projects", "{} is too short").format([]),
572 norm(str(e.exception)), "Wrong exception text")
573
574 def test_edit_user(self):
575 now = time()
576 uid = str(uuid4())
577 pid1 = str(uuid4())
578 rid1 = str(uuid4())
579 prms = [{"project": pid1, "project_name": "project-1", "role": rid1, "role_name": "role-1"}]
580 user = {"_id": uid, "username": self.test_name, "project_role_mappings": prms,
581 "_admin": {"created": now, "modified": now}}
582 with self.subTest(i=1):
583 self.auth.get_user_list.side_effect = [[user], []]
584 self.auth.get_user.return_value = user
585 pid2 = str(uuid4())
586 rid2 = str(uuid4())
587 self.auth.get_project.side_effect = [{"_id": pid2, "name": "project-2"},
588 {"_id": pid1, "name": "project-1"}]
589 self.auth.get_role.side_effect = [{"_id": rid2, "name": "role-2"},
590 {"_id": rid1, "name": "role-1"}]
591 new_name = "new-user-name"
592 new_pasw = "new-password"
593 add_prms = [{"project": pid2, "role": rid2}]
594 rem_prms = [{"project": pid1, "role": rid1}]
595 self.topic.edit(self.fake_session, uid, {"username": new_name, "password": new_pasw,
596 "add_project_role_mappings": add_prms,
597 "remove_project_role_mappings": rem_prms
598 })
599 content = self.auth.update_user.call_args[0][0]
600 self.assertEqual(content["_id"], uid, "Wrong user identifier")
601 self.assertEqual(content["username"], new_name, "Wrong user name")
602 self.assertEqual(content["password"], new_pasw, "Wrong user password")
603 self.assertEqual(content["add_project_role_mappings"], add_prms, "Wrong project-role mappings to add")
604 self.assertEqual(content["remove_project_role_mappings"], prms, "Wrong project-role mappings to remove")
605 with self.subTest(i=2):
606 new_name = "other-user-name"
607 new_prms = [{}]
608 self.auth.get_role_list.side_effect = [[user], []]
609 self.auth.get_user_list.side_effect = [[user]]
610 with self.assertRaises(EngineException, msg="Accepted wrong project-role mappings") as e:
611 self.topic.edit(self.fake_session, uid, {"username": new_name, "project_role_mappings": new_prms})
612 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
613 self.assertIn("format error at '{}' '{}'"
614 .format("project_role_mappings:{}", "'{}' is a required property").format(0, "project"),
615 norm(str(e.exception)), "Wrong exception text")
616
617 def test_delete_user(self):
618 with self.subTest(i=1):
619 uid = str(uuid4())
620 self.fake_session["username"] = self.test_name
621 user = user = {"_id": uid, "username": "other-user-name", "project_role_mappings": []}
622 self.auth.get_user.return_value = user
623 self.auth.delete_user.return_value = {"deleted": 1}
624 rc = self.topic.delete(self.fake_session, uid)
625 self.assertEqual(rc, {"deleted": 1}, "Wrong user deletion return info")
626 self.assertEqual(self.auth.get_user.call_args[0][0], uid, "Wrong user identifier")
627 self.assertEqual(self.auth.delete_user.call_args[0][0], uid, "Wrong user identifier")
628
629 def test_conflict_on_new(self):
630 with self.subTest(i=1):
631 rollback = []
632 uid = str(uuid4())
633 with self.assertRaises(EngineException, msg="Accepted uuid as username") as e:
634 self.topic.new(rollback, self.fake_session, {"username": uid, "password": self.test_name,
635 "projects": [test_pid]})
636 self.assertEqual(len(rollback), 0, "Wrong rollback length")
637 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
638 self.assertIn("username '{}' cannot have a uuid format".format(uid),
639 norm(str(e.exception)), "Wrong exception text")
640 with self.subTest(i=2):
641 rollback = []
642 self.auth.get_user_list.return_value = [{"_id": str(uuid4()), "username": self.test_name}]
643 with self.assertRaises(EngineException, msg="Accepted existing username") as e:
644 self.topic.new(rollback, self.fake_session, {"username": self.test_name, "password": self.test_name,
645 "projects": [test_pid]})
646 self.assertEqual(len(rollback), 0, "Wrong rollback length")
647 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
648 self.assertIn("username '{}' is already used".format(self.test_name),
649 norm(str(e.exception)), "Wrong exception text")
650 with self.subTest(i=3):
651 rollback = []
652 self.auth.get_user_list.return_value = []
653 self.auth.get_role_list.side_effect = [[], []]
654 with self.assertRaises(AuthconnNotFoundException, msg="Accepted user without default role") as e:
655 self.topic.new(rollback, self.fake_session, {"username": self.test_name, "password": self.test_name,
656 "projects": [str(uuid4())]})
657 self.assertEqual(len(rollback), 0, "Wrong rollback length")
658 self.assertEqual(e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code")
659 self.assertIn("can't find default role for user '{}'".format(self.test_name),
660 norm(str(e.exception)), "Wrong exception text")
661
662 def test_conflict_on_edit(self):
663 uid = str(uuid4())
664 with self.subTest(i=1):
665 self.auth.get_user_list.return_value = [{"_id": uid, "username": self.test_name}]
666 new_name = str(uuid4())
667 with self.assertRaises(EngineException, msg="Accepted uuid as username") as e:
668 self.topic.edit(self.fake_session, uid, {"username": new_name})
669 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
670 self.assertIn("username '{}' cannot have an uuid format".format(new_name),
671 norm(str(e.exception)), "Wrong exception text")
672 with self.subTest(i=2):
673 self.auth.get_user_list.return_value = [{"_id": uid, "username": self.test_name}]
674 self.auth.get_role_list.side_effect = [[], []]
675 with self.assertRaises(AuthconnNotFoundException, msg="Accepted user without default role") as e:
676 self.topic.edit(self.fake_session, uid, {"projects": [str(uuid4())]})
677 self.assertEqual(e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code")
678 self.assertIn("can't find a default role for user '{}'".format(self.test_name),
679 norm(str(e.exception)), "Wrong exception text")
680 with self.subTest(i=3):
681 admin_uid = str(uuid4())
682 self.auth.get_user_list.return_value = [{"_id": admin_uid, "username": "admin"}]
683 with self.assertRaises(EngineException, msg="Accepted removing system_admin role from admin user") as e:
684 self.topic.edit(self.fake_session, admin_uid,
685 {"remove_project_role_mappings": [{"project": "admin", "role": "system_admin"}]})
686 self.assertEqual(e.exception.http_code, HTTPStatus.FORBIDDEN, "Wrong HTTP status code")
687 self.assertIn("you cannot remove system_admin role from admin user",
688 norm(str(e.exception)), "Wrong exception text")
689 with self.subTest(i=4):
690 new_name = "new-user-name"
691 self.auth.get_user_list.side_effect = [[{"_id": uid, "name": self.test_name}],
692 [{"_id": str(uuid4()), "name": new_name}]]
693 with self.assertRaises(EngineException, msg="Accepted existing username") as e:
694 self.topic.edit(self.fake_session, uid, {"username": new_name})
695 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
696 self.assertIn("username '{}' is already used".format(new_name),
697 norm(str(e.exception)), "Wrong exception text")
698
699 def test_conflict_on_del(self):
700 with self.subTest(i=1):
701 uid = str(uuid4())
702 self.fake_session["username"] = self.test_name
703 user = user = {"_id": uid, "username": self.test_name, "project_role_mappings": []}
704 self.auth.get_user.return_value = user
705 with self.assertRaises(EngineException, msg="Accepted deletion of own user") as e:
706 self.topic.delete(self.fake_session, uid)
707 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
708 self.assertIn("you cannot delete your own login user", norm(str(e.exception)), "Wrong exception text")
709
710
711 class Test_CommonVimWimSdn(TestCase):
712
713 @classmethod
714 def setUpClass(cls):
715 cls.test_name = "test-cim-topic" # CIM = Common Infrastructure Manager
716
717 def setUp(self):
718 self.db = Mock(dbbase.DbBase())
719 self.fs = Mock(fsbase.FsBase())
720 self.msg = Mock(msgbase.MsgBase())
721 self.auth = Mock(authconn.Authconn(None, None, None))
722 self.topic = CommonVimWimSdn(self.db, self.fs, self.msg, self.auth)
723 # Use WIM schemas for testing because they are the simplest
724 self.topic._send_msg = Mock()
725 self.topic.topic = "wims"
726 self.topic.schema_new = validation.wim_account_new_schema
727 self.topic.schema_edit = validation.wim_account_edit_schema
728 self.fake_session = {"username": test_name, "project_id": (test_pid,), "method": None,
729 "admin": True, "force": False, "public": False, "allow_show_user_project_role": True}
730 self.topic.check_quota = Mock(return_value=None) # skip quota
731
732 def test_new_cvws(self):
733 test_url = "http://0.0.0.0:0"
734 with self.subTest(i=1):
735 rollback = []
736 test_type = "fake"
737 self.db.get_one.return_value = None
738 self.db.create.side_effect = lambda self, content: content["_id"]
739 cid, oid = self.topic.new(rollback, self.fake_session,
740 {"name": self.test_name, "wim_url": test_url, "wim_type": test_type})
741 self.assertEqual(len(rollback), 1, "Wrong rollback length")
742 args = self.db.create.call_args[0]
743 content = args[1]
744 self.assertEqual(args[0], self.topic.topic, "Wrong topic")
745 self.assertEqual(content["_id"], cid, "Wrong CIM identifier")
746 self.assertEqual(content["name"], self.test_name, "Wrong CIM name")
747 self.assertEqual(content["wim_url"], test_url, "Wrong URL")
748 self.assertEqual(content["wim_type"], test_type, "Wrong CIM type")
749 self.assertEqual(content["schema_version"], "1.11", "Wrong schema version")
750 self.assertEqual(content["op_id"], oid, "Wrong operation identifier")
751 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
752 self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time")
753 self.assertEqual(content["_admin"]["operationalState"], "PROCESSING", "Wrong operational state")
754 self.assertEqual(content["_admin"]["projects_read"], [test_pid], "Wrong read-only projects")
755 self.assertEqual(content["_admin"]["projects_write"], [test_pid], "Wrong read/write projects")
756 self.assertIsNone(content["_admin"]["current_operation"], "Wrong current operation")
757 self.assertEqual(len(content["_admin"]["operations"]), 1, "Wrong number of operations")
758 operation = content["_admin"]["operations"][0]
759 self.assertEqual(operation["lcmOperationType"], "create", "Wrong operation type")
760 self.assertEqual(operation["operationState"], "PROCESSING", "Wrong operation state")
761 self.assertGreater(operation["startTime"], content["_admin"]["created"], "Wrong operation start time")
762 self.assertGreater(operation["statusEnteredTime"], content["_admin"]["created"],
763 "Wrong operation status enter time")
764 self.assertEqual(operation["detailed-status"], "", "Wrong operation detailed status info")
765 self.assertIsNone(operation["operationParams"], "Wrong operation parameters")
766 # This test is disabled. From Feature 8030 we admit all WIM/SDN types
767 # with self.subTest(i=2):
768 # rollback = []
769 # test_type = "bad_type"
770 # with self.assertRaises(EngineException, msg="Accepted wrong CIM type") as e:
771 # self.topic.new(rollback, self.fake_session,
772 # {"name": self.test_name, "wim_url": test_url, "wim_type": test_type})
773 # self.assertEqual(len(rollback), 0, "Wrong rollback length")
774 # self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
775 # self.assertIn("format error at '{}' '{}".format("wim_type", "'{}' is not one of {}").format(test_type,""),
776 # norm(str(e.exception)), "Wrong exception text")
777
778 def test_conflict_on_new(self):
779 with self.subTest(i=1):
780 rollback = []
781 test_url = "http://0.0.0.0:0"
782 test_type = "fake"
783 self.db.get_one.return_value = {"_id": str(uuid4()), "name": self.test_name}
784 with self.assertRaises(EngineException, msg="Accepted existing CIM name") as e:
785 self.topic.new(rollback, self.fake_session,
786 {"name": self.test_name, "wim_url": test_url, "wim_type": test_type})
787 self.assertEqual(len(rollback), 0, "Wrong rollback length")
788 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
789 self.assertIn("name '{}' already exists for {}".format(self.test_name, self.topic.topic),
790 norm(str(e.exception)), "Wrong exception text")
791
792 def test_edit_cvws(self):
793 now = time()
794 cid = str(uuid4())
795 test_url = "http://0.0.0.0:0"
796 test_type = "fake"
797 cvws = {"_id": cid, "name": self.test_name, "wim_url": test_url, "wim_type": test_type,
798 "_admin": {"created": now, "modified": now, "operations": [{"lcmOperationType": "create"}]}}
799 with self.subTest(i=1):
800 new_name = "new-cim-name"
801 new_url = "https://1.1.1.1:1"
802 new_type = "onos"
803 self.db.get_one.side_effect = [cvws, None]
804 self.db.replace.return_value = {"updated": 1}
805 # self.db.encrypt.side_effect = [b64str(), b64str()]
806 self.topic.edit(self.fake_session, cid, {"name": new_name, "wim_url": new_url, "wim_type": new_type})
807 args = self.db.replace.call_args[0]
808 content = args[2]
809 self.assertEqual(args[0], self.topic.topic, "Wrong topic")
810 self.assertEqual(args[1], cid, "Wrong CIM identifier")
811 self.assertEqual(content["_id"], cid, "Wrong CIM identifier")
812 self.assertEqual(content["name"], new_name, "Wrong CIM name")
813 self.assertEqual(content["wim_type"], new_type, "Wrong CIM type")
814 self.assertEqual(content["wim_url"], new_url, "Wrong URL")
815 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
816 self.assertGreater(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time")
817 self.assertEqual(len(content["_admin"]["operations"]), 2, "Wrong number of operations")
818 operation = content["_admin"]["operations"][1]
819 self.assertEqual(operation["lcmOperationType"], "edit", "Wrong operation type")
820 self.assertEqual(operation["operationState"], "PROCESSING", "Wrong operation state")
821 self.assertGreater(operation["startTime"], content["_admin"]["modified"], "Wrong operation start time")
822 self.assertGreater(operation["statusEnteredTime"], content["_admin"]["modified"],
823 "Wrong operation status enter time")
824 self.assertEqual(operation["detailed-status"], "", "Wrong operation detailed status info")
825 self.assertIsNone(operation["operationParams"], "Wrong operation parameters")
826 with self.subTest(i=2):
827 self.db.get_one.side_effect = [cvws]
828 with self.assertRaises(EngineException, msg="Accepted wrong property") as e:
829 self.topic.edit(self.fake_session, str(uuid4()), {"name": "new-name", "extra_prop": "anything"})
830 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
831 self.assertIn("format error '{}'".format("additional properties are not allowed ('{}' was unexpected)").
832 format("extra_prop"),
833 norm(str(e.exception)), "Wrong exception text")
834
835 def test_conflict_on_edit(self):
836 with self.subTest(i=1):
837 cid = str(uuid4())
838 new_name = "new-cim-name"
839 self.db.get_one.side_effect = [{"_id": cid, "name": self.test_name},
840 {"_id": str(uuid4()), "name": new_name}]
841 with self.assertRaises(EngineException, msg="Accepted existing CIM name") as e:
842 self.topic.edit(self.fake_session, cid, {"name": new_name})
843 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
844 self.assertIn("name '{}' already exists for {}".format(new_name, self.topic.topic),
845 norm(str(e.exception)), "Wrong exception text")
846
847 def test_delete_cvws(self):
848 cid = str(uuid4())
849 ro_pid = str(uuid4())
850 rw_pid = str(uuid4())
851 cvws = {"_id": cid, "name": self.test_name}
852 self.db.get_list.return_value = []
853 with self.subTest(i=1):
854 cvws["_admin"] = {"projects_read": [test_pid, ro_pid, rw_pid], "projects_write": [test_pid, rw_pid]}
855 self.db.get_one.return_value = cvws
856 oid = self.topic.delete(self.fake_session, cid)
857 self.assertIsNone(oid, "Wrong operation identifier")
858 self.assertEqual(self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic")
859 self.assertEqual(self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier")
860 self.assertEqual(self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic")
861 self.assertEqual(self.db.set_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier")
862 self.assertEqual(self.db.set_one.call_args[1]["update_dict"], None,
863 "Wrong read-only projects update")
864 self.assertEqual(self.db.set_one.call_args[1]["pull_list"],
865 {"_admin.projects_read": (test_pid,), "_admin.projects_write": (test_pid,)},
866 "Wrong read/write projects update")
867 self.topic._send_msg.assert_not_called()
868 with self.subTest(i=2):
869 now = time()
870 cvws["_admin"] = {"projects_read": [test_pid], "projects_write": [test_pid], "operations": []}
871 self.db.get_one.return_value = cvws
872 oid = self.topic.delete(self.fake_session, cid)
873 self.assertEqual(oid, cid+":0", "Wrong operation identifier")
874 self.assertEqual(self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic")
875 self.assertEqual(self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier")
876 self.assertEqual(self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic")
877 self.assertEqual(self.db.set_one.call_args[0][1]["_id"], cid, "Wrong user identifier")
878 self.assertEqual(self.db.set_one.call_args[1]["update_dict"], {"_admin.to_delete": True},
879 "Wrong _admin.to_delete update")
880 operation = self.db.set_one.call_args[1]["push"]["_admin.operations"]
881 self.assertEqual(operation["lcmOperationType"], "delete", "Wrong operation type")
882 self.assertEqual(operation["operationState"], "PROCESSING", "Wrong operation state")
883 self.assertEqual(operation["detailed-status"], "", "Wrong operation detailed status")
884 self.assertIsNone(operation["operationParams"], "Wrong operation parameters")
885 self.assertGreater(operation["startTime"], now, "Wrong operation start time")
886 self.assertGreater(operation["statusEnteredTime"], now, "Wrong operation status enter time")
887 self.topic._send_msg.assert_called_once_with("delete", {"_id": cid, "op_id": cid + ":0"}, not_send_msg=None)
888 with self.subTest(i=3):
889 cvws["_admin"] = {"projects_read": [], "projects_write": [], "operations": []}
890 self.db.get_one.return_value = cvws
891 self.topic._send_msg.reset_mock()
892 self.db.get_one.reset_mock()
893 self.db.del_one.reset_mock()
894 self.fake_session["force"] = True # to force deletion
895 self.fake_session["admin"] = True # to force deletion
896 self.fake_session["project_id"] = [] # to force deletion
897 oid = self.topic.delete(self.fake_session, cid)
898 self.assertIsNone(oid, "Wrong operation identifier")
899 self.assertEqual(self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic")
900 self.assertEqual(self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier")
901 self.assertEqual(self.db.del_one.call_args[0][0], self.topic.topic, "Wrong topic")
902 self.assertEqual(self.db.del_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier")
903 self.topic._send_msg.assert_called_once_with("deleted", {"_id": cid, "op_id": None}, not_send_msg=None)
904
905
906 if __name__ == '__main__':
907 unittest.main()