+# Copyright 2018 Whitestack, LLC
+# Copyright 2018 Telefonica S.A.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# For those usages not covered by the Apache License, Version 2.0 please
+# contact: esousa@whitestack.com or alfonso.tiernosepulveda@telefonica.com
+##
+
import http
import pytest
import unittest
from osm_common.dbbase import DbBase, DbException, deep_update
from os import urandom
+from http import HTTPStatus
def exception_message(message):
def test_db_connect(db_base):
with pytest.raises(DbException) as excinfo:
db_base.db_connect(None)
- assert str(excinfo.value).startswith(exception_message("Method 'db_connect' not implemented"))
+ assert str(excinfo.value).startswith(
+ exception_message("Method 'db_connect' not implemented")
+ )
def test_db_disconnect(db_base):
def test_get_list(db_base):
with pytest.raises(DbException) as excinfo:
db_base.get_list(None, None)
- assert str(excinfo.value).startswith(exception_message("Method 'get_list' not implemented"))
+ assert str(excinfo.value).startswith(
+ exception_message("Method 'get_list' not implemented")
+ )
assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
def test_get_one(db_base):
with pytest.raises(DbException) as excinfo:
db_base.get_one(None, None, None, None)
- assert str(excinfo.value).startswith(exception_message("Method 'get_one' not implemented"))
+ assert str(excinfo.value).startswith(
+ exception_message("Method 'get_one' not implemented")
+ )
assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
def test_create(db_base):
with pytest.raises(DbException) as excinfo:
db_base.create(None, None)
- assert str(excinfo.value).startswith(exception_message("Method 'create' not implemented"))
+ assert str(excinfo.value).startswith(
+ exception_message("Method 'create' not implemented")
+ )
+ assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
+
+
+def test_create_list(db_base):
+ with pytest.raises(DbException) as excinfo:
+ db_base.create_list(None, None)
+ assert str(excinfo.value).startswith(
+ exception_message("Method 'create_list' not implemented")
+ )
assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
def test_del_list(db_base):
with pytest.raises(DbException) as excinfo:
db_base.del_list(None, None)
- assert str(excinfo.value).startswith(exception_message("Method 'del_list' not implemented"))
+ assert str(excinfo.value).startswith(
+ exception_message("Method 'del_list' not implemented")
+ )
assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
def test_del_one(db_base):
with pytest.raises(DbException) as excinfo:
db_base.del_one(None, None, None)
- assert str(excinfo.value).startswith(exception_message("Method 'del_one' not implemented"))
+ assert str(excinfo.value).startswith(
+ exception_message("Method 'del_one' not implemented")
+ )
assert excinfo.value.http_code == http.HTTPStatus.NOT_FOUND
class TestEncryption(unittest.TestCase):
def setUp(self):
- master_password = "Setting a long master password with numbers 123 and capitals AGHBNHD and symbols %&8)!'"
+ master_key = "Setting a long master key with numbers 123 and capitals AGHBNHD and symbols %&8)!'"
db_base1 = DbBase()
db_base2 = DbBase()
+ db_base3 = DbBase()
# set self.secret_key obtained when connect
- db_base1.secret_key = DbBase._join_passwords(urandom(32), master_password)
- db_base2.secret_key = DbBase._join_passwords(urandom(32), None)
- self.db_base = [db_base1, db_base2]
+ db_base1.set_secret_key(master_key, replace=True)
+ db_base1.set_secret_key(urandom(32))
+ db_base2.set_secret_key(None, replace=True)
+ db_base2.set_secret_key(urandom(30))
+ db_base3.set_secret_key(master_key)
+ self.db_bases = [db_base1, db_base2, db_base3]
def test_encrypt_decrypt(self):
TEST = (
(u"plain unicode 5 with salt ! ", "1a000d1a-4a7e-4d9c-8c65-251290183106"),
(u"plain unicode 6 with usalt ! ", u"1abcdd1a-4a7e-4d9c-8c65-251290183106"),
)
- for db_base in self.db_base:
+ for db_base in self.db_bases:
for value, salt in TEST:
# no encryption
- encrypted = db_base.encrypt(value, schema_version='1.0', salt=salt)
- self.assertEqual(encrypted, value, "value '{}' has been encrypted".format(value))
- decrypted = db_base.decrypt(encrypted, schema_version='1.0', salt=salt)
- self.assertEqual(decrypted, value, "value '{}' has been decrypted".format(value))
+ encrypted = db_base.encrypt(value, schema_version="1.0", salt=salt)
+ self.assertEqual(
+ encrypted, value, "value '{}' has been encrypted".format(value)
+ )
+ decrypted = db_base.decrypt(encrypted, schema_version="1.0", salt=salt)
+ self.assertEqual(
+ decrypted, value, "value '{}' has been decrypted".format(value)
+ )
# encrypt/decrypt
- encrypted = db_base.encrypt(value, schema_version='1.1', salt=salt)
- self.assertNotEqual(encrypted, value, "value '{}' has not been encrypted".format(value))
+ encrypted = db_base.encrypt(value, schema_version="1.1", salt=salt)
+ self.assertNotEqual(
+ encrypted, value, "value '{}' has not been encrypted".format(value)
+ )
self.assertIsInstance(encrypted, str, "Encrypted is not ascii text")
- decrypted = db_base.decrypt(encrypted, schema_version='1.1', salt=salt)
- self.assertEqual(decrypted, value, "value is not equal after encryption/decryption")
+ decrypted = db_base.decrypt(encrypted, schema_version="1.1", salt=salt)
+ self.assertEqual(
+ decrypted, value, "value is not equal after encryption/decryption"
+ )
def test_encrypt_decrypt_salt(self):
value = "value to be encrypted!"
encrypted = []
- for db_base in self.db_base:
+ for db_base in self.db_bases:
for salt in (None, "salt 1", "1afd5d1a-4a7e-4d9c-8c65-251290183106"):
# encrypt/decrypt
- encrypted.append(db_base.encrypt(value, schema_version='1.1', salt=salt))
- self.assertNotEqual(encrypted[-1], value, "value '{}' has not been encrypted".format(value))
+ encrypted.append(
+ db_base.encrypt(value, schema_version="1.1", salt=salt)
+ )
+ self.assertNotEqual(
+ encrypted[-1],
+ value,
+ "value '{}' has not been encrypted".format(value),
+ )
self.assertIsInstance(encrypted[-1], str, "Encrypted is not ascii text")
- decrypted = db_base.decrypt(encrypted[-1], schema_version='1.1', salt=salt)
- self.assertEqual(decrypted, value, "value is not equal after encryption/decryption")
+ decrypted = db_base.decrypt(
+ encrypted[-1], schema_version="1.1", salt=salt
+ )
+ self.assertEqual(
+ decrypted, value, "value is not equal after encryption/decryption"
+ )
for i in range(0, len(encrypted)):
- for j in range(i+1, len(encrypted)):
- self.assertNotEqual(encrypted[i], encrypted[j],
- "encryption with different salt contains different result")
+ for j in range(i + 1, len(encrypted)):
+ self.assertNotEqual(
+ encrypted[i],
+ encrypted[j],
+ "encryption with different salt must contain different result",
+ )
+ # decrypt with a different master key
+ try:
+ decrypted = self.db_bases[-1].decrypt(
+ encrypted[0], schema_version="1.1", salt=None
+ )
+ self.assertNotEqual(
+ encrypted[0],
+ decrypted,
+ "Decryption with different KEY must generate different result",
+ )
+ except DbException as e:
+ self.assertEqual(
+ e.http_code,
+ HTTPStatus.INTERNAL_SERVER_ERROR,
+ "Decryption with different KEY does not provide expected http_code",
+ )
class TestDeepUpdate(unittest.TestCase):
({"A": ["a", "b", "a"]}, {"A": {"$a": None}}, {"A": ["b"]}),
({"A": ["a", "b", "a"]}, {"A": {"$d": None}}, {"A": ["a", "b", "a"]}),
# delete and insert at 0
- ({"A": ["a", "b", "c"]}, {"A": {"$b": None, "$+[0]": "b"}}, {"A": ["b", "a", "c"]}),
+ (
+ {"A": ["a", "b", "c"]},
+ {"A": {"$b": None, "$+[0]": "b"}},
+ {"A": ["b", "a", "c"]},
+ ),
# delete and edit
- ({"A": ["a", "b", "a"]}, {"A": {"$a": None, "$[1]": {"c": "d"}}}, {"A": [{"c": "d"}]}),
+ (
+ {"A": ["a", "b", "a"]},
+ {"A": {"$a": None, "$[1]": {"c": "d"}}},
+ {"A": [{"c": "d"}]},
+ ),
# insert if not exist
({"A": ["a", "b", "c"]}, {"A": {"$+b": "b"}}, {"A": ["a", "b", "c"]}),
({"A": ["a", "b", "c"]}, {"A": {"$+d": "f"}}, {"A": ["a", "b", "c", "f"]}),
# edit by filter
- ({"A": ["a", "b", "a"]}, {"A": {"$b": {"c": "d"}}}, {"A": ["a", {"c": "d"}, "a"]}),
- ({"A": ["a", "b", "a"]}, {"A": {"$b": None, "$+[0]": "b", "$+": "c"}}, {"A": ["b", "a", "a", "c"]}),
+ (
+ {"A": ["a", "b", "a"]},
+ {"A": {"$b": {"c": "d"}}},
+ {"A": ["a", {"c": "d"}, "a"]},
+ ),
+ (
+ {"A": ["a", "b", "a"]},
+ {"A": {"$b": None, "$+[0]": "b", "$+": "c"}},
+ {"A": ["b", "a", "a", "c"]},
+ ),
({"A": ["a", "b", "a"]}, {"A": {"$c": None}}, {"A": ["a", "b", "a"]}),
# index deletion out of range
({"A": ["a", "b", "a"]}, {"A": {"$[5]": None}}, {"A": ["a", "b", "a"]}),
# nested array->dict
- ({"A": ["a", "b", {"id": "1", "c": {"d": 2}}]}, {"A": {"$id: '1'": {"h": None, "c": {"d": "e", "f": "g"}}}},
- {"A": ["a", "b", {"id": "1", "c": {"d": "e", "f": "g"}}]}),
- ({"A": [{"id": 1, "c": {"d": 2}}, {"id": 1, "c": {"f": []}}]},
- {"A": {"$id: 1": {"h": None, "c": {"d": "e", "f": "g"}}}},
- {"A": [{"id": 1, "c": {"d": "e", "f": "g"}}, {"id": 1, "c": {"d": "e", "f": "g"}}]}),
+ (
+ {"A": ["a", "b", {"id": "1", "c": {"d": 2}}]},
+ {"A": {"$id: '1'": {"h": None, "c": {"d": "e", "f": "g"}}}},
+ {"A": ["a", "b", {"id": "1", "c": {"d": "e", "f": "g"}}]},
+ ),
+ (
+ {"A": [{"id": 1, "c": {"d": 2}}, {"id": 1, "c": {"f": []}}]},
+ {"A": {"$id: 1": {"h": None, "c": {"d": "e", "f": "g"}}}},
+ {
+ "A": [
+ {"id": 1, "c": {"d": "e", "f": "g"}},
+ {"id": 1, "c": {"d": "e", "f": "g"}},
+ ]
+ },
+ ),
# nested array->array
- ({"A": ["a", "b", ["a", "b"]]}, {"A": {"$b": None, "$[2]": {"$b": {}, "$+": "c"}}},
- {"A": ["a", ["a", {}, "c"]]}),
+ (
+ {"A": ["a", "b", ["a", "b"]]},
+ {"A": {"$b": None, "$[2]": {"$b": {}, "$+": "c"}}},
+ {"A": ["a", ["a", {}, "c"]]},
+ ),
# types str and int different, so not found
- ({"A": ["a", {"id": "1", "c": "d"}]}, {"A": {"$id: 1": {"c": "e"}}}, {"A": ["a", {"id": "1", "c": "d"}]}),
-
+ (
+ {"A": ["a", {"id": "1", "c": "d"}]},
+ {"A": {"$id: 1": {"c": "e"}}},
+ {"A": ["a", {"id": "1", "c": "d"}]},
+ ),
)
for t in TEST:
print(t)
# index edition out of range
({"A": ["a", "b", "a"]}, {"A": {"$[5]": 6}}),
# conflict, two editions on index 2
- ({"A": ["a", {"id": "1", "c": "d"}]}, {"A": {"$id: '1'": {"c": "e"}, "$c: d": {"c": "f"}}}),
+ (
+ {"A": ["a", {"id": "1", "c": "d"}]},
+ {"A": {"$id: '1'": {"c": "e"}, "$c: d": {"c": "f"}}},
+ ),
)
for t in TEST:
print(t)
print(e)
-if __name__ == '__main__':
+if __name__ == "__main__":
unittest.main()