X-Git-Url: https://osm.etsi.org/gitweb/?a=blobdiff_plain;f=osm_common%2Ftests%2Ftest_dbbase.py;h=918ced700f2b1f0ca8d51154145633a651fc74db;hb=cfc5272864156b706d7147fc4e7c0fe46dc386c8;hp=c2af52f6bcb1aeb78ac372b70ca07d386b693f72;hpb=b20a902cfc15e04f623b83e8756463540f3e7852;p=osm%2Fcommon.git diff --git a/osm_common/tests/test_dbbase.py b/osm_common/tests/test_dbbase.py index c2af52f..918ced7 100644 --- a/osm_common/tests/test_dbbase.py +++ b/osm_common/tests/test_dbbase.py @@ -1,7 +1,8 @@ import http import pytest - -from osm_common.dbbase import DbBase, DbException +import unittest +from osm_common.dbbase import DbBase, DbException, deep_update +from os import urandom def exception_message(message): @@ -20,7 +21,9 @@ def test_constructor(): def test_db_connect(db_base): - db_base.db_connect(None) + with pytest.raises(DbException) as excinfo: + db_base.db_connect(None) + assert str(excinfo.value).startswith(exception_message("Method 'db_connect' not implemented")) def test_db_disconnect(db_base): @@ -60,3 +63,159 @@ def test_del_one(db_base): db_base.del_one(None, None, None) assert str(excinfo.value).startswith(exception_message("Method 'del_one' not implemented")) assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND + + +class TestEncryption(unittest.TestCase): + def setUp(self): + master_password = "Setting a long master password with numbers 123 and capitals AGHBNHD and symbols %&8)!'" + db_base1 = DbBase() + db_base2 = DbBase() + # set self.secret_key obtained when connect + db_base1.secret_key = DbBase._join_passwords(urandom(32), master_password) + db_base2.secret_key = DbBase._join_passwords(urandom(32), None) + self.db_base = [db_base1, db_base2] + + def test_encrypt_decrypt(self): + TEST = ( + ("plain text 1 ! ", None), + ("plain text 2 with salt ! ", "1afd5d1a-4a7e-4d9c-8c65-251290183106"), + ("plain text 3 with usalt ! ", u"1afd5d1a-4a7e-4d9c-8c65-251290183106"), + (u"plain unicode 4 ! ", None), + (u"plain unicode 5 with salt ! ", "1a000d1a-4a7e-4d9c-8c65-251290183106"), + (u"plain unicode 6 with usalt ! ", u"1abcdd1a-4a7e-4d9c-8c65-251290183106"), + ) + for db_base in self.db_base: + for value, salt in TEST: + # no encryption + encrypted = db_base.encrypt(value, schema_version='1.0', salt=salt) + self.assertEqual(encrypted, value, "value '{}' has been encrypted".format(value)) + decrypted = db_base.decrypt(encrypted, schema_version='1.0', salt=salt) + self.assertEqual(decrypted, value, "value '{}' has been decrypted".format(value)) + + # encrypt/decrypt + encrypted = db_base.encrypt(value, schema_version='1.1', salt=salt) + self.assertNotEqual(encrypted, value, "value '{}' has not been encrypted".format(value)) + self.assertIsInstance(encrypted, str, "Encrypted is not ascii text") + decrypted = db_base.decrypt(encrypted, schema_version='1.1', salt=salt) + self.assertEqual(decrypted, value, "value is not equal after encryption/decryption") + + def test_encrypt_decrypt_salt(self): + value = "value to be encrypted!" + encrypted = [] + for db_base in self.db_base: + for salt in (None, "salt 1", "1afd5d1a-4a7e-4d9c-8c65-251290183106"): + # encrypt/decrypt + encrypted.append(db_base.encrypt(value, schema_version='1.1', salt=salt)) + self.assertNotEqual(encrypted[-1], value, "value '{}' has not been encrypted".format(value)) + self.assertIsInstance(encrypted[-1], str, "Encrypted is not ascii text") + decrypted = db_base.decrypt(encrypted[-1], schema_version='1.1', salt=salt) + self.assertEqual(decrypted, value, "value is not equal after encryption/decryption") + for i in range(0, len(encrypted)): + for j in range(i+1, len(encrypted)): + self.assertNotEqual(encrypted[i], encrypted[j], + "encryption with different salt contains different result") + + +class TestDeepUpdate(unittest.TestCase): + def test_update_dict(self): + # Original, patch, expected result + TEST = ( + ({"a": "b"}, {"a": "c"}, {"a": "c"}), + ({"a": "b"}, {"b": "c"}, {"a": "b", "b": "c"}), + ({"a": "b"}, {"a": None}, {}), + ({"a": "b", "b": "c"}, {"a": None}, {"b": "c"}), + ({"a": ["b"]}, {"a": "c"}, {"a": "c"}), + ({"a": "c"}, {"a": ["b"]}, {"a": ["b"]}), + ({"a": {"b": "c"}}, {"a": {"b": "d", "c": None}}, {"a": {"b": "d"}}), + ({"a": [{"b": "c"}]}, {"a": [1]}, {"a": [1]}), + ({1: ["a", "b"]}, {1: ["c", "d"]}, {1: ["c", "d"]}), + ({1: {"a": "b"}}, {1: ["c"]}, {1: ["c"]}), + ({1: {"a": "foo"}}, {1: None}, {}), + ({1: {"a": "foo"}}, {1: "bar"}, {1: "bar"}), + ({"e": None}, {"a": 1}, {"e": None, "a": 1}), + ({1: [1, 2]}, {1: {"a": "b", "c": None}}, {1: {"a": "b"}}), + ({}, {"a": {"bb": {"ccc": None}}}, {"a": {"bb": {}}}), + ) + for t in TEST: + deep_update(t[0], t[1]) + self.assertEqual(t[0], t[2]) + # test deepcopy is done. So that original dictionary does not reference the pach + test_original = {1: {"a": "b"}} + test_patch = {1: {"c": {"d": "e"}}} + test_result = {1: {"a": "b", "c": {"d": "e"}}} + deep_update(test_original, test_patch) + self.assertEqual(test_original, test_result) + test_patch[1]["c"]["f"] = "edition of patch, must not modify original" + self.assertEqual(test_original, test_result) + + def test_update_array(self): + # This TEST contains a list with the the Original, patch, and expected result + TEST = ( + # delete all instances of "a"/"d" + ({"A": ["a", "b", "a"]}, {"A": {"$a": None}}, {"A": ["b"]}), + ({"A": ["a", "b", "a"]}, {"A": {"$d": None}}, {"A": ["a", "b", "a"]}), + # delete and insert at 0 + ({"A": ["a", "b", "c"]}, {"A": {"$b": None, "$+[0]": "b"}}, {"A": ["b", "a", "c"]}), + # delete and edit + ({"A": ["a", "b", "a"]}, {"A": {"$a": None, "$[1]": {"c": "d"}}}, {"A": [{"c": "d"}]}), + # insert if not exist + ({"A": ["a", "b", "c"]}, {"A": {"$+b": "b"}}, {"A": ["a", "b", "c"]}), + ({"A": ["a", "b", "c"]}, {"A": {"$+d": "f"}}, {"A": ["a", "b", "c", "f"]}), + # edit by filter + ({"A": ["a", "b", "a"]}, {"A": {"$b": {"c": "d"}}}, {"A": ["a", {"c": "d"}, "a"]}), + ({"A": ["a", "b", "a"]}, {"A": {"$b": None, "$+[0]": "b", "$+": "c"}}, {"A": ["b", "a", "a", "c"]}), + ({"A": ["a", "b", "a"]}, {"A": {"$c": None}}, {"A": ["a", "b", "a"]}), + # index deletion out of range + ({"A": ["a", "b", "a"]}, {"A": {"$[5]": None}}, {"A": ["a", "b", "a"]}), + # nested array->dict + ({"A": ["a", "b", {"id": "1", "c": {"d": 2}}]}, {"A": {"$id: '1'": {"h": None, "c": {"d": "e", "f": "g"}}}}, + {"A": ["a", "b", {"id": "1", "c": {"d": "e", "f": "g"}}]}), + ({"A": [{"id": 1, "c": {"d": 2}}, {"id": 1, "c": {"f": []}}]}, + {"A": {"$id: 1": {"h": None, "c": {"d": "e", "f": "g"}}}}, + {"A": [{"id": 1, "c": {"d": "e", "f": "g"}}, {"id": 1, "c": {"d": "e", "f": "g"}}]}), + # nested array->array + ({"A": ["a", "b", ["a", "b"]]}, {"A": {"$b": None, "$[2]": {"$b": {}, "$+": "c"}}}, + {"A": ["a", ["a", {}, "c"]]}), + # types str and int different, so not found + ({"A": ["a", {"id": "1", "c": "d"}]}, {"A": {"$id: 1": {"c": "e"}}}, {"A": ["a", {"id": "1", "c": "d"}]}), + + ) + for t in TEST: + print(t) + deep_update(t[0], t[1]) + self.assertEqual(t[0], t[2]) + + def test_update_badformat(self): + # This TEST contains original, incorrect patch and #TODO text that must be present + TEST = ( + # conflict, index 0 is edited twice + ({"A": ["a", "b", "a"]}, {"A": {"$a": None, "$[0]": {"c": "d"}}}), + # conflict, two insertions at same index + ({"A": ["a", "b", "a"]}, {"A": {"$[1]": "c", "$[-2]": "d"}}), + ({"A": ["a", "b", "a"]}, {"A": {"$[1]": "c", "$[+1]": "d"}}), + # bad format keys with and without $ + ({"A": ["a", "b", "a"]}, {"A": {"$b": {"c": "d"}, "c": 3}}), + # bad format empty $ and yaml incorrect + ({"A": ["a", "b", "a"]}, {"A": {"$": 3}}), + ({"A": ["a", "b", "a"]}, {"A": {"$a: b: c": 3}}), + ({"A": ["a", "b", "a"]}, {"A": {"$a: b, c: d": 3}}), + # insertion of None + ({"A": ["a", "b", "a"]}, {"A": {"$+": None}}), + # Not found, insertion of None + ({"A": ["a", "b", "a"]}, {"A": {"$+c": None}}), + # index edition out of range + ({"A": ["a", "b", "a"]}, {"A": {"$[5]": 6}}), + # conflict, two editions on index 2 + ({"A": ["a", {"id": "1", "c": "d"}]}, {"A": {"$id: '1'": {"c": "e"}, "$c: d": {"c": "f"}}}), + ) + for t in TEST: + print(t) + self.assertRaises(DbException, deep_update, t[0], t[1]) + try: + deep_update(t[0], t[1]) + except DbException as e: + print(e) + + +if __name__ == '__main__': + unittest.main()