blob: 2c8154e7cb199f2d38acaf0f7b3663531a9dad47 [file] [log] [blame]
delacruzramo79e40f42019-10-10 16:36:40 +02001#! /usr/bin/python3
2# -*- coding: utf-8 -*-
3
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13# implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17__author__ = "Pedro de la Cruz Ramos, pedro.delacruzramos@altran.com"
18__date__ = "$2019-10-019"
19
20import unittest
21from unittest import TestCase
22from unittest.mock import Mock
23from uuid import uuid4
24from http import HTTPStatus
25from time import time
26from random import randint
27from osm_common import dbbase, fsbase, msgbase
28from osm_nbi import authconn, validation
29from osm_nbi.admin_topics import ProjectTopicAuth, RoleTopicAuth, UserTopicAuth, CommonVimWimSdn
30from osm_nbi.engine import EngineException
31from osm_nbi.authconn import AuthconnNotFoundException
32
33
34test_pid = str(uuid4())
35test_name = "test-user"
36
37
38def norm(str):
39 """Normalize string for checking"""
40 return ' '.join(str.strip().split()).lower()
41
42
43class Test_ProjectTopicAuth(TestCase):
44
45 @classmethod
46 def setUpClass(cls):
47 cls.test_name = "test-project-topic"
48
49 def setUp(self):
50 self.db = Mock(dbbase.DbBase())
51 self.fs = Mock(fsbase.FsBase())
52 self.msg = Mock(msgbase.MsgBase())
tierno9e87a7f2020-03-23 09:24:10 +000053 self.auth = Mock(authconn.Authconn(None, None, None))
delacruzramo79e40f42019-10-10 16:36:40 +020054 self.topic = ProjectTopicAuth(self.db, self.fs, self.msg, self.auth)
55 self.fake_session = {"username": self.test_name, "project_id": (test_pid,), "method": None,
56 "admin": True, "force": False, "public": False, "allow_show_user_project_role": True}
tiernod7749582020-05-28 10:41:10 +000057 self.topic.check_quota = Mock(return_value=None) # skip quota
delacruzramo79e40f42019-10-10 16:36:40 +020058
59 def test_new_project(self):
60 with self.subTest(i=1):
61 rollback = []
62 pid1 = str(uuid4())
63 self.auth.get_project_list.return_value = []
64 self.auth.create_project.return_value = pid1
65 pid2, oid = self.topic.new(rollback, self.fake_session, {"name": self.test_name, "quotas": {}})
66 self.assertEqual(len(rollback), 1, "Wrong rollback length")
67 self.assertEqual(pid2, pid1, "Wrong project identifier")
68 content = self.auth.create_project.call_args[0][0]
69 self.assertEqual(content["name"], self.test_name, "Wrong project name")
70 self.assertEqual(content["quotas"], {}, "Wrong quotas")
71 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
72 self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time")
73 with self.subTest(i=2):
74 rollback = []
75 with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e:
76 self.topic.new(rollback, self.fake_session, {"name": "other-project-name", "quotas": {"baditems": 10}})
77 self.assertEqual(len(rollback), 0, "Wrong rollback length")
78 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
79 self.assertIn("format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'"
80 .format("baditems"), norm(str(e.exception)), "Wrong exception text")
81
82 def test_edit_project(self):
83 now = time()
84 pid = str(uuid4())
85 proj = {"_id": pid, "name": self.test_name, "_admin": {"created": now, "modified": now}}
86 with self.subTest(i=1):
87 self.auth.get_project_list.side_effect = [[proj], []]
88 new_name = "new-project-name"
89 quotas = {"vnfds": randint(0, 100), "nsds": randint(0, 100)}
90 self.topic.edit(self.fake_session, pid, {"name": new_name, "quotas": quotas})
91 _id, content = self.auth.update_project.call_args[0]
92 self.assertEqual(_id, pid, "Wrong project identifier")
93 self.assertEqual(content["_id"], pid, "Wrong project identifier")
94 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
95 self.assertGreater(content["_admin"]["modified"], now, "Wrong modification time")
96 self.assertEqual(content["name"], new_name, "Wrong project name")
97 self.assertEqual(content["quotas"], quotas, "Wrong quotas")
98 with self.subTest(i=2):
99 new_name = "other-project-name"
100 quotas = {"baditems": randint(0, 100)}
101 self.auth.get_project_list.side_effect = [[proj], []]
102 with self.assertRaises(EngineException, msg="Accepted wrong quotas") as e:
103 self.topic.edit(self.fake_session, pid, {"name": new_name, "quotas": quotas})
104 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
105 self.assertIn("format error at 'quotas' 'additional properties are not allowed ('{}' was unexpected)'"
106 .format("baditems"), norm(str(e.exception)), "Wrong exception text")
107
108 def test_conflict_on_new(self):
109 with self.subTest(i=1):
110 rollback = []
111 pid = str(uuid4())
112 with self.assertRaises(EngineException, msg="Accepted uuid as project name") as e:
113 self.topic.new(rollback, self.fake_session, {"name": pid})
114 self.assertEqual(len(rollback), 0, "Wrong rollback length")
115 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
116 self.assertIn("project name '{}' cannot have an uuid format".format(pid),
117 norm(str(e.exception)), "Wrong exception text")
118 with self.subTest(i=2):
119 rollback = []
120 self.auth.get_project_list.return_value = [{"_id": test_pid, "name": self.test_name}]
121 with self.assertRaises(EngineException, msg="Accepted existing project name") as e:
122 self.topic.new(rollback, self.fake_session, {"name": self.test_name})
123 self.assertEqual(len(rollback), 0, "Wrong rollback length")
124 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
125 self.assertIn("project '{}' exists".format(self.test_name),
126 norm(str(e.exception)), "Wrong exception text")
127
128 def test_conflict_on_edit(self):
129 with self.subTest(i=1):
130 self.auth.get_project_list.return_value = [{"_id": test_pid, "name": self.test_name}]
131 new_name = str(uuid4())
132 with self.assertRaises(EngineException, msg="Accepted uuid as project name") as e:
133 self.topic.edit(self.fake_session, test_pid, {"name": new_name})
134 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
135 self.assertIn("project name '{}' cannot have an uuid format".format(new_name),
136 norm(str(e.exception)), "Wrong exception text")
137 with self.subTest(i=2):
138 pid = str(uuid4())
139 self.auth.get_project_list.return_value = [{"_id": pid, "name": "admin"}]
140 with self.assertRaises(EngineException, msg="Accepted renaming of project 'admin'") as e:
141 self.topic.edit(self.fake_session, pid, {"name": "new-name"})
142 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
143 self.assertIn("you cannot rename project 'admin'",
144 norm(str(e.exception)), "Wrong exception text")
145 with self.subTest(i=3):
146 new_name = "new-project-name"
147 self.auth.get_project_list.side_effect = [[{"_id": test_pid, "name": self.test_name}],
148 [{"_id": str(uuid4()), "name": new_name}]]
149 with self.assertRaises(EngineException, msg="Accepted existing project name") as e:
150 self.topic.edit(self.fake_session, pid, {"name": new_name})
151 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
152 self.assertIn("project '{}' is already used".format(new_name),
153 norm(str(e.exception)), "Wrong exception text")
154
155 def test_delete_project(self):
156 with self.subTest(i=1):
157 pid = str(uuid4())
158 self.auth.get_project.return_value = {"_id": pid, "name": "other-project-name"}
159 self.auth.delete_project.return_value = {"deleted": 1}
160 self.auth.get_user_list.return_value = []
161 self.db.get_list.return_value = []
162 rc = self.topic.delete(self.fake_session, pid)
163 self.assertEqual(rc, {"deleted": 1}, "Wrong project deletion return info")
164 self.assertEqual(self.auth.get_project.call_args[0][0], pid, "Wrong project identifier")
165 self.assertEqual(self.auth.delete_project.call_args[0][0], pid, "Wrong project identifier")
166
167 def test_conflict_on_del(self):
168 with self.subTest(i=1):
169 self.auth.get_project.return_value = {"_id": test_pid, "name": self.test_name}
170 with self.assertRaises(EngineException, msg="Accepted deletion of own project") as e:
171 self.topic.delete(self.fake_session, self.test_name)
172 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
173 self.assertIn("you cannot delete your own project", norm(str(e.exception)), "Wrong exception text")
174 with self.subTest(i=2):
175 self.auth.get_project.return_value = {"_id": str(uuid4()), "name": "admin"}
176 with self.assertRaises(EngineException, msg="Accepted deletion of project 'admin'") as e:
177 self.topic.delete(self.fake_session, "admin")
178 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
179 self.assertIn("you cannot delete project 'admin'", norm(str(e.exception)), "Wrong exception text")
180 with self.subTest(i=3):
181 pid = str(uuid4())
182 name = "other-project-name"
183 self.auth.get_project.return_value = {"_id": pid, "name": name}
184 self.auth.get_user_list.return_value = [{"_id": str(uuid4()), "username": self.test_name,
185 "project_role_mappings": [{"project": pid, "role": str(uuid4())}]}]
186 with self.assertRaises(EngineException, msg="Accepted deletion of used project") as e:
187 self.topic.delete(self.fake_session, pid)
188 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
189 self.assertIn("project '{}' ({}) is being used by user '{}'".format(name, pid, self.test_name),
190 norm(str(e.exception)), "Wrong exception text")
191 with self.subTest(i=4):
192 self.auth.get_user_list.return_value = []
193 self.db.get_list.return_value = [{"_id": str(uuid4()), "id": self.test_name,
194 "_admin": {"projects_read": [pid], "projects_write": []}}]
195 with self.assertRaises(EngineException, msg="Accepted deletion of used project") as e:
196 self.topic.delete(self.fake_session, pid)
197 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
198 self.assertIn("project '{}' ({}) is being used by {} '{}'"
199 .format(name, pid, "vnf descriptor", self.test_name),
200 norm(str(e.exception)), "Wrong exception text")
201
202
203class Test_RoleTopicAuth(TestCase):
204
205 @classmethod
206 def setUpClass(cls):
207 cls.test_name = "test-role-topic"
208 cls.test_operations = ["tokens:get"]
209
210 def setUp(self):
211 self.db = Mock(dbbase.DbBase())
212 self.fs = Mock(fsbase.FsBase())
213 self.msg = Mock(msgbase.MsgBase())
tierno9e87a7f2020-03-23 09:24:10 +0000214 self.auth = Mock(authconn.Authconn(None, None, None))
215 self.auth.role_permissions = self.test_operations
216 self.topic = RoleTopicAuth(self.db, self.fs, self.msg, self.auth)
delacruzramo79e40f42019-10-10 16:36:40 +0200217 self.fake_session = {"username": test_name, "project_id": (test_pid,), "method": None,
218 "admin": True, "force": False, "public": False, "allow_show_user_project_role": True}
tiernod7749582020-05-28 10:41:10 +0000219 self.topic.check_quota = Mock(return_value=None) # skip quota
delacruzramo79e40f42019-10-10 16:36:40 +0200220
221 def test_new_role(self):
222 with self.subTest(i=1):
223 rollback = []
224 rid1 = str(uuid4())
225 perms_in = {"tokens": True}
226 perms_out = {"default": False, "admin": False, "tokens": True}
227 self.auth.get_role_list.return_value = []
228 self.auth.create_role.return_value = rid1
229 rid2, oid = self.topic.new(rollback, self.fake_session, {"name": self.test_name, "permissions": perms_in})
230 self.assertEqual(len(rollback), 1, "Wrong rollback length")
231 self.assertEqual(rid2, rid1, "Wrong project identifier")
232 content = self.auth.create_role.call_args[0][0]
233 self.assertEqual(content["name"], self.test_name, "Wrong role name")
234 self.assertEqual(content["permissions"], perms_out, "Wrong permissions")
235 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
236 self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time")
237 with self.subTest(i=2):
238 rollback = []
239 with self.assertRaises(EngineException, msg="Accepted wrong permissions") as e:
240 self.topic.new(rollback, self.fake_session,
241 {"name": "other-role-name", "permissions": {"projects": True}})
242 self.assertEqual(len(rollback), 0, "Wrong rollback length")
243 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
244 self.assertIn("invalid permission '{}'".format("projects"),
245 norm(str(e.exception)), "Wrong exception text")
246
247 def test_edit_role(self):
248 now = time()
249 rid = str(uuid4())
250 role = {"_id": rid, "name": self.test_name, "permissions": {"tokens": True},
251 "_admin": {"created": now, "modified": now}}
252 with self.subTest(i=1):
253 self.auth.get_role_list.side_effect = [[role], []]
254 self.auth.get_role.return_value = role
255 new_name = "new-role-name"
256 perms_in = {"tokens": False, "tokens:get": True}
257 perms_out = {"default": False, "admin": False, "tokens": False, "tokens:get": True}
258 self.topic.edit(self.fake_session, rid, {"name": new_name, "permissions": perms_in})
259 content = self.auth.update_role.call_args[0][0]
260 self.assertEqual(content["_id"], rid, "Wrong role identifier")
261 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
262 self.assertGreater(content["_admin"]["modified"], now, "Wrong modification time")
263 self.assertEqual(content["name"], new_name, "Wrong role name")
264 self.assertEqual(content["permissions"], perms_out, "Wrong permissions")
265 with self.subTest(i=2):
266 new_name = "other-role-name"
267 perms_in = {"tokens": False, "tokens:post": True}
268 self.auth.get_role_list.side_effect = [[role], []]
269 with self.assertRaises(EngineException, msg="Accepted wrong permissions") as e:
270 self.topic.edit(self.fake_session, rid, {"name": new_name, "permissions": perms_in})
271 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
272 self.assertIn("invalid permission '{}'".format("tokens:post"),
273 norm(str(e.exception)), "Wrong exception text")
274
275 def test_delete_role(self):
276 with self.subTest(i=1):
277 rid = str(uuid4())
278 role = {"_id": rid, "name": "other-role-name"}
279 self.auth.get_role_list.return_value = [role]
280 self.auth.get_role.return_value = role
281 self.auth.delete_role.return_value = {"deleted": 1}
282 self.auth.get_user_list.return_value = []
283 rc = self.topic.delete(self.fake_session, rid)
284 self.assertEqual(rc, {"deleted": 1}, "Wrong role deletion return info")
285 self.assertEqual(self.auth.get_role_list.call_args[0][0]["_id"], rid, "Wrong role identifier")
286 self.assertEqual(self.auth.get_role.call_args[0][0], rid, "Wrong role identifier")
287 self.assertEqual(self.auth.delete_role.call_args[0][0], rid, "Wrong role identifier")
288
289 def test_conflict_on_new(self):
290 with self.subTest(i=1):
291 rollback = []
292 rid = str(uuid4())
293 with self.assertRaises(EngineException, msg="Accepted uuid as role name") as e:
294 self.topic.new(rollback, self.fake_session, {"name": rid})
295 self.assertEqual(len(rollback), 0, "Wrong rollback length")
296 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
297 self.assertIn("role name '{}' cannot have an uuid format".format(rid),
298 norm(str(e.exception)), "Wrong exception text")
299 with self.subTest(i=2):
300 rollback = []
301 self.auth.get_role_list.return_value = [{"_id": str(uuid4()), "name": self.test_name}]
302 with self.assertRaises(EngineException, msg="Accepted existing role name") as e:
303 self.topic.new(rollback, self.fake_session, {"name": self.test_name})
304 self.assertEqual(len(rollback), 0, "Wrong rollback length")
305 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
306 self.assertIn("role name '{}' exists".format(self.test_name),
307 norm(str(e.exception)), "Wrong exception text")
308
309 def test_conflict_on_edit(self):
310 rid = str(uuid4())
311 with self.subTest(i=1):
312 self.auth.get_role_list.return_value = [{"_id": rid, "name": self.test_name, "permissions": {}}]
313 new_name = str(uuid4())
314 with self.assertRaises(EngineException, msg="Accepted uuid as role name") as e:
315 self.topic.edit(self.fake_session, rid, {"name": new_name})
316 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
317 self.assertIn("role name '{}' cannot have an uuid format".format(new_name),
318 norm(str(e.exception)), "Wrong exception text")
319 for i, role_name in enumerate(["system_admin", "project_admin"], start=2):
320 with self.subTest(i=i):
321 rid = str(uuid4())
322 self.auth.get_role.return_value = {"_id": rid, "name": role_name, "permissions": {}}
323 with self.assertRaises(EngineException, msg="Accepted renaming of role '{}'".format(role_name)) as e:
324 self.topic.edit(self.fake_session, rid, {"name": "new-name"})
325 self.assertEqual(e.exception.http_code, HTTPStatus.FORBIDDEN, "Wrong HTTP status code")
326 self.assertIn("you cannot rename role '{}'".format(role_name),
327 norm(str(e.exception)), "Wrong exception text")
328 with self.subTest(i=i+1):
329 new_name = "new-role-name"
330 self.auth.get_role_list.side_effect = [[{"_id": rid, "name": self.test_name, "permissions": {}}],
331 [{"_id": str(uuid4()), "name": new_name, "permissions": {}}]]
332 self.auth.get_role.return_value = {"_id": rid, "name": self.test_name, "permissions": {}}
333 with self.assertRaises(EngineException, msg="Accepted existing role name") as e:
334 self.topic.edit(self.fake_session, rid, {"name": new_name})
335 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
336 self.assertIn("role name '{}' exists".format(new_name),
337 norm(str(e.exception)), "Wrong exception text")
338
339 def test_conflict_on_del(self):
340 for i, role_name in enumerate(["system_admin", "project_admin"], start=1):
341 with self.subTest(i=i):
342 rid = str(uuid4())
343 role = {"_id": rid, "name": role_name}
344 self.auth.get_role_list.return_value = [role]
345 self.auth.get_role.return_value = role
346 with self.assertRaises(EngineException, msg="Accepted deletion of role '{}'".format(role_name)) as e:
347 self.topic.delete(self.fake_session, rid)
348 self.assertEqual(e.exception.http_code, HTTPStatus.FORBIDDEN, "Wrong HTTP status code")
349 self.assertIn("you cannot delete role '{}'".format(role_name),
350 norm(str(e.exception)), "Wrong exception text")
351 with self.subTest(i=i+1):
352 rid = str(uuid4())
353 name = "other-role-name"
354 role = {"_id": rid, "name": name}
355 self.auth.get_role_list.return_value = [role]
356 self.auth.get_role.return_value = role
357 self.auth.get_user_list.return_value = [{"_id": str(uuid4()), "username": self.test_name,
358 "project_role_mappings": [{"project": str(uuid4()), "role": rid}]}]
359 with self.assertRaises(EngineException, msg="Accepted deletion of used role") as e:
360 self.topic.delete(self.fake_session, rid)
361 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
362 self.assertIn("role '{}' ({}) is being used by user '{}'".format(name, rid, self.test_name),
363 norm(str(e.exception)), "Wrong exception text")
364
365
366class Test_UserTopicAuth(TestCase):
367
368 @classmethod
369 def setUpClass(cls):
370 cls.test_name = "test-user-topic"
371
372 def setUp(self):
373 self.db = Mock(dbbase.DbBase())
374 self.fs = Mock(fsbase.FsBase())
375 self.msg = Mock(msgbase.MsgBase())
tierno9e87a7f2020-03-23 09:24:10 +0000376 self.auth = Mock(authconn.Authconn(None, None, None))
delacruzramo79e40f42019-10-10 16:36:40 +0200377 self.topic = UserTopicAuth(self.db, self.fs, self.msg, self.auth)
378 self.fake_session = {"username": test_name, "project_id": (test_pid,), "method": None,
379 "admin": True, "force": False, "public": False, "allow_show_user_project_role": True}
tiernod7749582020-05-28 10:41:10 +0000380 self.topic.check_quota = Mock(return_value=None) # skip quota
delacruzramo79e40f42019-10-10 16:36:40 +0200381
382 def test_new_user(self):
383 uid1 = str(uuid4())
384 pid = str(uuid4())
385 self.auth.get_user_list.return_value = []
386 self.auth.get_project.return_value = {"_id": pid, "name": "some_project"}
387 self.auth.create_user.return_value = {"_id": uid1, "username": self.test_name}
388 with self.subTest(i=1):
389 rollback = []
390 rid = str(uuid4())
391 self.auth.get_role.return_value = {"_id": rid, "name": "some_role"}
392 prms_in = [{"project": "some_project", "role": "some_role"}]
393 prms_out = [{"project": pid, "role": rid}]
394 uid2, oid = self.topic.new(rollback, self.fake_session, {"username": self.test_name,
395 "password": self.test_name,
396 "project_role_mappings": prms_in
397 })
398 self.assertEqual(len(rollback), 1, "Wrong rollback length")
399 self.assertEqual(uid2, uid1, "Wrong project identifier")
400 content = self.auth.create_user.call_args[0][0]
401 self.assertEqual(content["username"], self.test_name, "Wrong project name")
402 self.assertEqual(content["password"], self.test_name, "Wrong password")
403 self.assertEqual(content["project_role_mappings"], prms_out, "Wrong project-role mappings")
404 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
405 self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time")
406 with self.subTest(i=2):
407 rollback = []
408 def_rid = str(uuid4())
409 def_role = {"_id": def_rid, "name": "project_admin"}
410 self.auth.get_role.return_value = def_role
411 self.auth.get_role_list.return_value = [def_role]
412 prms_out = [{"project": pid, "role": def_rid}]
413 uid2, oid = self.topic.new(rollback, self.fake_session, {"username": self.test_name,
414 "password": self.test_name,
415 "projects": ["some_project"]
416 })
417 self.assertEqual(len(rollback), 1, "Wrong rollback length")
418 self.assertEqual(uid2, uid1, "Wrong project identifier")
419 content = self.auth.create_user.call_args[0][0]
420 self.assertEqual(content["username"], self.test_name, "Wrong project name")
421 self.assertEqual(content["password"], self.test_name, "Wrong password")
422 self.assertEqual(content["project_role_mappings"], prms_out, "Wrong project-role mappings")
423 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
424 self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time")
425 with self.subTest(i=3):
426 rollback = []
427 with self.assertRaises(EngineException, msg="Accepted wrong project-role mappings") as e:
428 self.topic.new(rollback, self.fake_session, {"username": "other-project-name",
429 "password": "other-password",
430 "project_role_mappings": [{}]
431 })
432 self.assertEqual(len(rollback), 0, "Wrong rollback length")
433 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
434 self.assertIn("format error at '{}' '{}'"
435 .format("project_role_mappings:{}", "'{}' is a required property").format(0, "project"),
436 norm(str(e.exception)), "Wrong exception text")
437 with self.subTest(i=4):
438 rollback = []
439 with self.assertRaises(EngineException, msg="Accepted wrong projects") as e:
440 self.topic.new(rollback, self.fake_session, {"username": "other-project-name",
441 "password": "other-password",
442 "projects": []
443 })
444 self.assertEqual(len(rollback), 0, "Wrong rollback length")
445 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
446 self.assertIn("format error at '{}' '{}'" .format("projects", "{} is too short").format([]),
447 norm(str(e.exception)), "Wrong exception text")
448
449 def test_edit_user(self):
450 now = time()
451 uid = str(uuid4())
452 pid1 = str(uuid4())
453 rid1 = str(uuid4())
454 prms = [{"project": pid1, "project_name": "project-1", "role": rid1, "role_name": "role-1"}]
455 user = {"_id": uid, "username": self.test_name, "project_role_mappings": prms,
456 "_admin": {"created": now, "modified": now}}
457 with self.subTest(i=1):
458 self.auth.get_user_list.side_effect = [[user], []]
459 self.auth.get_user.return_value = user
460 pid2 = str(uuid4())
461 rid2 = str(uuid4())
462 self.auth.get_project.side_effect = [{"_id": pid2, "name": "project-2"},
463 {"_id": pid1, "name": "project-1"}]
464 self.auth.get_role.side_effect = [{"_id": rid2, "name": "role-2"},
465 {"_id": rid1, "name": "role-1"}]
466 new_name = "new-user-name"
467 new_pasw = "new-password"
468 add_prms = [{"project": pid2, "role": rid2}]
469 rem_prms = [{"project": pid1, "role": rid1}]
470 self.topic.edit(self.fake_session, uid, {"username": new_name, "password": new_pasw,
471 "add_project_role_mappings": add_prms,
472 "remove_project_role_mappings": rem_prms
473 })
474 content = self.auth.update_user.call_args[0][0]
475 self.assertEqual(content["_id"], uid, "Wrong user identifier")
476 self.assertEqual(content["username"], new_name, "Wrong user name")
477 self.assertEqual(content["password"], new_pasw, "Wrong user password")
478 self.assertEqual(content["add_project_role_mappings"], add_prms, "Wrong project-role mappings to add")
479 self.assertEqual(content["remove_project_role_mappings"], prms, "Wrong project-role mappings to remove")
480 with self.subTest(i=2):
481 new_name = "other-user-name"
482 new_prms = [{}]
483 self.auth.get_role_list.side_effect = [[user], []]
Frank Brydendeba68e2020-07-27 13:55:11 +0000484 self.auth.get_user_list.side_effect = [[user]]
delacruzramo79e40f42019-10-10 16:36:40 +0200485 with self.assertRaises(EngineException, msg="Accepted wrong project-role mappings") as e:
486 self.topic.edit(self.fake_session, uid, {"username": new_name, "project_role_mappings": new_prms})
487 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
488 self.assertIn("format error at '{}' '{}'"
489 .format("project_role_mappings:{}", "'{}' is a required property").format(0, "project"),
490 norm(str(e.exception)), "Wrong exception text")
491
492 def test_delete_user(self):
493 with self.subTest(i=1):
494 uid = str(uuid4())
495 self.fake_session["username"] = self.test_name
496 user = user = {"_id": uid, "username": "other-user-name", "project_role_mappings": []}
497 self.auth.get_user.return_value = user
498 self.auth.delete_user.return_value = {"deleted": 1}
499 rc = self.topic.delete(self.fake_session, uid)
500 self.assertEqual(rc, {"deleted": 1}, "Wrong user deletion return info")
501 self.assertEqual(self.auth.get_user.call_args[0][0], uid, "Wrong user identifier")
502 self.assertEqual(self.auth.delete_user.call_args[0][0], uid, "Wrong user identifier")
503
504 def test_conflict_on_new(self):
505 with self.subTest(i=1):
506 rollback = []
507 uid = str(uuid4())
508 with self.assertRaises(EngineException, msg="Accepted uuid as username") as e:
509 self.topic.new(rollback, self.fake_session, {"username": uid, "password": self.test_name,
510 "projects": [test_pid]})
511 self.assertEqual(len(rollback), 0, "Wrong rollback length")
512 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
513 self.assertIn("username '{}' cannot have a uuid format".format(uid),
514 norm(str(e.exception)), "Wrong exception text")
515 with self.subTest(i=2):
516 rollback = []
517 self.auth.get_user_list.return_value = [{"_id": str(uuid4()), "username": self.test_name}]
518 with self.assertRaises(EngineException, msg="Accepted existing username") as e:
519 self.topic.new(rollback, self.fake_session, {"username": self.test_name, "password": self.test_name,
520 "projects": [test_pid]})
521 self.assertEqual(len(rollback), 0, "Wrong rollback length")
522 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
523 self.assertIn("username '{}' is already used".format(self.test_name),
524 norm(str(e.exception)), "Wrong exception text")
525 with self.subTest(i=3):
526 rollback = []
527 self.auth.get_user_list.return_value = []
528 self.auth.get_role_list.side_effect = [[], []]
529 with self.assertRaises(AuthconnNotFoundException, msg="Accepted user without default role") as e:
530 self.topic.new(rollback, self.fake_session, {"username": self.test_name, "password": self.test_name,
531 "projects": [str(uuid4())]})
532 self.assertEqual(len(rollback), 0, "Wrong rollback length")
533 self.assertEqual(e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code")
534 self.assertIn("can't find default role for user '{}'".format(self.test_name),
535 norm(str(e.exception)), "Wrong exception text")
536
537 def test_conflict_on_edit(self):
538 uid = str(uuid4())
539 with self.subTest(i=1):
540 self.auth.get_user_list.return_value = [{"_id": uid, "username": self.test_name}]
541 new_name = str(uuid4())
542 with self.assertRaises(EngineException, msg="Accepted uuid as username") as e:
543 self.topic.edit(self.fake_session, uid, {"username": new_name})
544 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
545 self.assertIn("username '{}' cannot have an uuid format".format(new_name),
546 norm(str(e.exception)), "Wrong exception text")
547 with self.subTest(i=2):
548 self.auth.get_user_list.return_value = [{"_id": uid, "username": self.test_name}]
549 self.auth.get_role_list.side_effect = [[], []]
550 with self.assertRaises(AuthconnNotFoundException, msg="Accepted user without default role") as e:
551 self.topic.edit(self.fake_session, uid, {"projects": [str(uuid4())]})
552 self.assertEqual(e.exception.http_code, HTTPStatus.NOT_FOUND, "Wrong HTTP status code")
553 self.assertIn("can't find a default role for user '{}'".format(self.test_name),
554 norm(str(e.exception)), "Wrong exception text")
555 with self.subTest(i=3):
556 admin_uid = str(uuid4())
557 self.auth.get_user_list.return_value = [{"_id": admin_uid, "username": "admin"}]
558 with self.assertRaises(EngineException, msg="Accepted removing system_admin role from admin user") as e:
559 self.topic.edit(self.fake_session, admin_uid,
560 {"remove_project_role_mappings": [{"project": "admin", "role": "system_admin"}]})
561 self.assertEqual(e.exception.http_code, HTTPStatus.FORBIDDEN, "Wrong HTTP status code")
562 self.assertIn("you cannot remove system_admin role from admin user",
563 norm(str(e.exception)), "Wrong exception text")
564 with self.subTest(i=4):
565 new_name = "new-user-name"
566 self.auth.get_user_list.side_effect = [[{"_id": uid, "name": self.test_name}],
567 [{"_id": str(uuid4()), "name": new_name}]]
568 with self.assertRaises(EngineException, msg="Accepted existing username") as e:
569 self.topic.edit(self.fake_session, uid, {"username": new_name})
570 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
571 self.assertIn("username '{}' is already used".format(new_name),
572 norm(str(e.exception)), "Wrong exception text")
573
574 def test_conflict_on_del(self):
575 with self.subTest(i=1):
576 uid = str(uuid4())
577 self.fake_session["username"] = self.test_name
578 user = user = {"_id": uid, "username": self.test_name, "project_role_mappings": []}
579 self.auth.get_user.return_value = user
580 with self.assertRaises(EngineException, msg="Accepted deletion of own user") as e:
581 self.topic.delete(self.fake_session, uid)
582 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
583 self.assertIn("you cannot delete your own login user", norm(str(e.exception)), "Wrong exception text")
584
585
586class Test_CommonVimWimSdn(TestCase):
587
588 @classmethod
589 def setUpClass(cls):
590 cls.test_name = "test-cim-topic" # CIM = Common Infrastructure Manager
591
592 def setUp(self):
593 self.db = Mock(dbbase.DbBase())
594 self.fs = Mock(fsbase.FsBase())
595 self.msg = Mock(msgbase.MsgBase())
tierno9e87a7f2020-03-23 09:24:10 +0000596 self.auth = Mock(authconn.Authconn(None, None, None))
delacruzramo79e40f42019-10-10 16:36:40 +0200597 self.topic = CommonVimWimSdn(self.db, self.fs, self.msg, self.auth)
598 # Use WIM schemas for testing because they are the simplest
tiernof5f2e3f2020-03-23 14:42:10 +0000599 self.topic._send_msg = Mock()
delacruzramo79e40f42019-10-10 16:36:40 +0200600 self.topic.topic = "wims"
601 self.topic.schema_new = validation.wim_account_new_schema
602 self.topic.schema_edit = validation.wim_account_edit_schema
603 self.fake_session = {"username": test_name, "project_id": (test_pid,), "method": None,
604 "admin": True, "force": False, "public": False, "allow_show_user_project_role": True}
tiernod7749582020-05-28 10:41:10 +0000605 self.topic.check_quota = Mock(return_value=None) # skip quota
delacruzramo79e40f42019-10-10 16:36:40 +0200606
607 def test_new_cvws(self):
608 test_url = "http://0.0.0.0:0"
609 with self.subTest(i=1):
610 rollback = []
611 test_type = "fake"
612 self.db.get_one.return_value = None
613 self.db.create.side_effect = lambda self, content: content["_id"]
614 cid, oid = self.topic.new(rollback, self.fake_session,
615 {"name": self.test_name, "wim_url": test_url, "wim_type": test_type})
616 self.assertEqual(len(rollback), 1, "Wrong rollback length")
617 args = self.db.create.call_args[0]
618 content = args[1]
619 self.assertEqual(args[0], self.topic.topic, "Wrong topic")
620 self.assertEqual(content["_id"], cid, "Wrong CIM identifier")
621 self.assertEqual(content["name"], self.test_name, "Wrong CIM name")
622 self.assertEqual(content["wim_url"], test_url, "Wrong URL")
623 self.assertEqual(content["wim_type"], test_type, "Wrong CIM type")
624 self.assertEqual(content["schema_version"], "1.11", "Wrong schema version")
625 self.assertEqual(content["op_id"], oid, "Wrong operation identifier")
626 self.assertIsNotNone(content["_admin"]["created"], "Wrong creation time")
627 self.assertEqual(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time")
628 self.assertEqual(content["_admin"]["operationalState"], "PROCESSING", "Wrong operational state")
629 self.assertEqual(content["_admin"]["projects_read"], [test_pid], "Wrong read-only projects")
630 self.assertEqual(content["_admin"]["projects_write"], [test_pid], "Wrong read/write projects")
631 self.assertIsNone(content["_admin"]["current_operation"], "Wrong current operation")
632 self.assertEqual(len(content["_admin"]["operations"]), 1, "Wrong number of operations")
633 operation = content["_admin"]["operations"][0]
634 self.assertEqual(operation["lcmOperationType"], "create", "Wrong operation type")
635 self.assertEqual(operation["operationState"], "PROCESSING", "Wrong operation state")
636 self.assertGreater(operation["startTime"], content["_admin"]["created"], "Wrong operation start time")
637 self.assertGreater(operation["statusEnteredTime"], content["_admin"]["created"],
638 "Wrong operation status enter time")
639 self.assertEqual(operation["detailed-status"], "", "Wrong operation detailed status info")
640 self.assertIsNone(operation["operationParams"], "Wrong operation parameters")
tiernob3d0a0e2019-11-13 15:57:51 +0000641 # This test is disabled. From Feature 8030 we admit all WIM/SDN types
642 # with self.subTest(i=2):
643 # rollback = []
644 # test_type = "bad_type"
645 # with self.assertRaises(EngineException, msg="Accepted wrong CIM type") as e:
646 # self.topic.new(rollback, self.fake_session,
647 # {"name": self.test_name, "wim_url": test_url, "wim_type": test_type})
648 # self.assertEqual(len(rollback), 0, "Wrong rollback length")
649 # self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
650 # self.assertIn("format error at '{}' '{}".format("wim_type", "'{}' is not one of {}").format(test_type,""),
651 # norm(str(e.exception)), "Wrong exception text")
delacruzramo79e40f42019-10-10 16:36:40 +0200652
653 def test_conflict_on_new(self):
654 with self.subTest(i=1):
655 rollback = []
656 test_url = "http://0.0.0.0:0"
657 test_type = "fake"
658 self.db.get_one.return_value = {"_id": str(uuid4()), "name": self.test_name}
659 with self.assertRaises(EngineException, msg="Accepted existing CIM name") as e:
660 self.topic.new(rollback, self.fake_session,
661 {"name": self.test_name, "wim_url": test_url, "wim_type": test_type})
662 self.assertEqual(len(rollback), 0, "Wrong rollback length")
663 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
664 self.assertIn("name '{}' already exists for {}".format(self.test_name, self.topic.topic),
665 norm(str(e.exception)), "Wrong exception text")
666
667 def test_edit_cvws(self):
668 now = time()
669 cid = str(uuid4())
670 test_url = "http://0.0.0.0:0"
671 test_type = "fake"
672 cvws = {"_id": cid, "name": self.test_name, "wim_url": test_url, "wim_type": test_type,
673 "_admin": {"created": now, "modified": now, "operations": [{"lcmOperationType": "create"}]}}
674 with self.subTest(i=1):
675 new_name = "new-cim-name"
676 new_url = "https://1.1.1.1:1"
677 new_type = "onos"
678 self.db.get_one.side_effect = [cvws, None]
679 self.db.replace.return_value = {"updated": 1}
680 # self.db.encrypt.side_effect = [b64str(), b64str()]
681 self.topic.edit(self.fake_session, cid, {"name": new_name, "wim_url": new_url, "wim_type": new_type})
682 args = self.db.replace.call_args[0]
683 content = args[2]
684 self.assertEqual(args[0], self.topic.topic, "Wrong topic")
685 self.assertEqual(args[1], cid, "Wrong CIM identifier")
686 self.assertEqual(content["_id"], cid, "Wrong CIM identifier")
687 self.assertEqual(content["name"], new_name, "Wrong CIM name")
688 self.assertEqual(content["wim_type"], new_type, "Wrong CIM type")
689 self.assertEqual(content["wim_url"], new_url, "Wrong URL")
690 self.assertEqual(content["_admin"]["created"], now, "Wrong creation time")
691 self.assertGreater(content["_admin"]["modified"], content["_admin"]["created"], "Wrong modification time")
692 self.assertEqual(len(content["_admin"]["operations"]), 2, "Wrong number of operations")
693 operation = content["_admin"]["operations"][1]
694 self.assertEqual(operation["lcmOperationType"], "edit", "Wrong operation type")
695 self.assertEqual(operation["operationState"], "PROCESSING", "Wrong operation state")
696 self.assertGreater(operation["startTime"], content["_admin"]["modified"], "Wrong operation start time")
697 self.assertGreater(operation["statusEnteredTime"], content["_admin"]["modified"],
698 "Wrong operation status enter time")
699 self.assertEqual(operation["detailed-status"], "", "Wrong operation detailed status info")
700 self.assertIsNone(operation["operationParams"], "Wrong operation parameters")
701 with self.subTest(i=2):
Frank Brydendeba68e2020-07-27 13:55:11 +0000702 self.db.get_one.side_effect = [cvws]
delacruzramo79e40f42019-10-10 16:36:40 +0200703 with self.assertRaises(EngineException, msg="Accepted wrong property") as e:
704 self.topic.edit(self.fake_session, str(uuid4()), {"name": "new-name", "extra_prop": "anything"})
705 self.assertEqual(e.exception.http_code, HTTPStatus.UNPROCESSABLE_ENTITY, "Wrong HTTP status code")
706 self.assertIn("format error '{}'".format("additional properties are not allowed ('{}' was unexpected)").
707 format("extra_prop"),
708 norm(str(e.exception)), "Wrong exception text")
709
710 def test_conflict_on_edit(self):
711 with self.subTest(i=1):
712 cid = str(uuid4())
713 new_name = "new-cim-name"
714 self.db.get_one.side_effect = [{"_id": cid, "name": self.test_name},
715 {"_id": str(uuid4()), "name": new_name}]
716 with self.assertRaises(EngineException, msg="Accepted existing CIM name") as e:
717 self.topic.edit(self.fake_session, cid, {"name": new_name})
718 self.assertEqual(e.exception.http_code, HTTPStatus.CONFLICT, "Wrong HTTP status code")
719 self.assertIn("name '{}' already exists for {}".format(new_name, self.topic.topic),
720 norm(str(e.exception)), "Wrong exception text")
721
722 def test_delete_cvws(self):
723 cid = str(uuid4())
724 ro_pid = str(uuid4())
725 rw_pid = str(uuid4())
726 cvws = {"_id": cid, "name": self.test_name}
delacruzramo35c998b2019-11-21 11:09:16 +0100727 self.db.get_list.return_value = []
delacruzramo79e40f42019-10-10 16:36:40 +0200728 with self.subTest(i=1):
729 cvws["_admin"] = {"projects_read": [test_pid, ro_pid, rw_pid], "projects_write": [test_pid, rw_pid]}
730 self.db.get_one.return_value = cvws
731 oid = self.topic.delete(self.fake_session, cid)
732 self.assertIsNone(oid, "Wrong operation identifier")
733 self.assertEqual(self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic")
734 self.assertEqual(self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier")
735 self.assertEqual(self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic")
736 self.assertEqual(self.db.set_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier")
tiernof5f2e3f2020-03-23 14:42:10 +0000737 self.assertEqual(self.db.set_one.call_args[1]["update_dict"], None,
delacruzramo79e40f42019-10-10 16:36:40 +0200738 "Wrong read-only projects update")
tierno20e74d22020-06-22 12:17:22 +0000739 self.assertEqual(self.db.set_one.call_args[1]["pull_list"],
740 {"_admin.projects_read": (test_pid,), "_admin.projects_write": (test_pid,)},
delacruzramo79e40f42019-10-10 16:36:40 +0200741 "Wrong read/write projects update")
tiernof5f2e3f2020-03-23 14:42:10 +0000742 self.topic._send_msg.assert_not_called()
delacruzramo35c998b2019-11-21 11:09:16 +0100743 with self.subTest(i=2):
delacruzramo79e40f42019-10-10 16:36:40 +0200744 now = time()
745 cvws["_admin"] = {"projects_read": [test_pid], "projects_write": [test_pid], "operations": []}
746 self.db.get_one.return_value = cvws
747 oid = self.topic.delete(self.fake_session, cid)
748 self.assertEqual(oid, cid+":0", "Wrong operation identifier")
749 self.assertEqual(self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic")
750 self.assertEqual(self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier")
751 self.assertEqual(self.db.set_one.call_args[0][0], self.topic.topic, "Wrong topic")
752 self.assertEqual(self.db.set_one.call_args[0][1]["_id"], cid, "Wrong user identifier")
tiernof5f2e3f2020-03-23 14:42:10 +0000753 self.assertEqual(self.db.set_one.call_args[1]["update_dict"], {"_admin.to_delete": True},
754 "Wrong _admin.to_delete update")
delacruzramo79e40f42019-10-10 16:36:40 +0200755 operation = self.db.set_one.call_args[1]["push"]["_admin.operations"]
756 self.assertEqual(operation["lcmOperationType"], "delete", "Wrong operation type")
757 self.assertEqual(operation["operationState"], "PROCESSING", "Wrong operation state")
758 self.assertEqual(operation["detailed-status"], "", "Wrong operation detailed status")
759 self.assertIsNone(operation["operationParams"], "Wrong operation parameters")
760 self.assertGreater(operation["startTime"], now, "Wrong operation start time")
761 self.assertGreater(operation["statusEnteredTime"], now, "Wrong operation status enter time")
tiernof5f2e3f2020-03-23 14:42:10 +0000762 self.topic._send_msg.assert_called_once_with("delete", {"_id": cid, "op_id": cid + ":0"}, not_send_msg=None)
delacruzramo79e40f42019-10-10 16:36:40 +0200763 with self.subTest(i=3):
764 cvws["_admin"] = {"projects_read": [], "projects_write": [], "operations": []}
765 self.db.get_one.return_value = cvws
tiernof5f2e3f2020-03-23 14:42:10 +0000766 self.topic._send_msg.reset_mock()
767 self.db.get_one.reset_mock()
768 self.db.del_one.reset_mock()
delacruzramo79e40f42019-10-10 16:36:40 +0200769 self.fake_session["force"] = True # to force deletion
tiernof5f2e3f2020-03-23 14:42:10 +0000770 self.fake_session["admin"] = True # to force deletion
771 self.fake_session["project_id"] = [] # to force deletion
delacruzramo79e40f42019-10-10 16:36:40 +0200772 oid = self.topic.delete(self.fake_session, cid)
773 self.assertIsNone(oid, "Wrong operation identifier")
774 self.assertEqual(self.db.get_one.call_args[0][0], self.topic.topic, "Wrong topic")
775 self.assertEqual(self.db.get_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier")
776 self.assertEqual(self.db.del_one.call_args[0][0], self.topic.topic, "Wrong topic")
777 self.assertEqual(self.db.del_one.call_args[0][1]["_id"], cid, "Wrong CIM identifier")
tiernof5f2e3f2020-03-23 14:42:10 +0000778 self.topic._send_msg.assert_called_once_with("deleted", {"_id": cid, "op_id": None}, not_send_msg=None)
delacruzramo79e40f42019-10-10 16:36:40 +0200779
780
781if __name__ == '__main__':
782 unittest.main()